Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
… into fix-10029
  • Loading branch information
Rishikesh1159 committed Oct 20, 2023
2 parents e07dc80 + 3ebdd12 commit f98e3a2
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 37 deletions.
1 change: 1 addition & 0 deletions .ci/bwcVersions
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ BWC_VERSION:
- "2.10.0"
- "2.10.1"
- "2.11.0"
- "2.11.1"
- "2.12.0"
1 change: 1 addition & 0 deletions libs/core/src/main/java/org/opensearch/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public class Version implements Comparable<Version>, ToXContentFragment {
public static final Version V_2_10_0 = new Version(2100099, org.apache.lucene.util.Version.LUCENE_9_7_0);
public static final Version V_2_10_1 = new Version(2100199, org.apache.lucene.util.Version.LUCENE_9_7_0);
public static final Version V_2_11_0 = new Version(2110099, org.apache.lucene.util.Version.LUCENE_9_7_0);
public static final Version V_2_11_1 = new Version(2110199, org.apache.lucene.util.Version.LUCENE_9_7_0);
public static final Version V_2_12_0 = new Version(2120099, org.apache.lucene.util.Version.LUCENE_9_8_0);
public static final Version V_3_0_0 = new Version(3000099, org.apache.lucene.util.Version.LUCENE_9_8_0);
public static final Version CURRENT = V_3_0_0;
Expand Down
67 changes: 40 additions & 27 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1608,8 +1608,11 @@ public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineExcepti
}

/**
* Compute and return the latest ReplicationCheckpoint for a particular shard.
* @return EMPTY checkpoint before the engine is opened and null for non-segrep enabled indices
* return the most recently computed ReplicationCheckpoint for a particular shard.
* The checkpoint is updated inside a refresh listener and may lag behind the SegmentInfos on the reader.
* To guarantee the checkpoint is upto date with the latest on-reader infos, use `getLatestSegmentInfosAndCheckpoint` instead.
*
* @return {@link ReplicationCheckpoint} - The most recently computed ReplicationCheckpoint.
*/
public ReplicationCheckpoint getLatestReplicationCheckpoint() {
return replicationTracker.getLatestReplicationCheckpoint();
Expand All @@ -1628,34 +1631,12 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegmentInfosAndCheckpoint() {
assert indexSettings.isSegRepEnabled();

Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> nullSegmentInfosEmptyCheckpoint = new Tuple<>(
new GatedCloseable<>(null, () -> {}),
getLatestReplicationCheckpoint()
);

if (getEngineOrNull() == null) {
return nullSegmentInfosEmptyCheckpoint;
}
// do not close the snapshot - caller will close it.
GatedCloseable<SegmentInfos> snapshot = null;
try {
snapshot = getSegmentInfosSnapshot();
if (snapshot.get() != null) {
SegmentInfos segmentInfos = snapshot.get();
final Map<String, StoreFileMetadata> metadataMap = store.getSegmentMetadataMap(segmentInfos);
return new Tuple<>(
snapshot,
new ReplicationCheckpoint(
this.shardId,
getOperationPrimaryTerm(),
segmentInfos.getGeneration(),
segmentInfos.getVersion(),
metadataMap.values().stream().mapToLong(StoreFileMetadata::length).sum(),
getEngine().config().getCodec().getName(),
metadataMap
)
);
}
final SegmentInfos segmentInfos = snapshot.get();
return new Tuple<>(snapshot, computeReplicationCheckpoint(segmentInfos));
} catch (IOException | AlreadyClosedException e) {
logger.error("Error Fetching SegmentInfos and latest checkpoint", e);
if (snapshot != null) {
Expand All @@ -1666,7 +1647,39 @@ public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegme
}
}
}
return nullSegmentInfosEmptyCheckpoint;
return new Tuple<>(new GatedCloseable<>(null, () -> {}), getLatestReplicationCheckpoint());
}

/**
* Compute the latest {@link ReplicationCheckpoint} from a SegmentInfos.
* This function fetches a metadata snapshot from the store that comes with an IO cost.
* We will reuse the existing stored checkpoint if it is at the same SI version.
*
* @param segmentInfos {@link SegmentInfos} infos to use to compute.
* @return {@link ReplicationCheckpoint} Checkpoint computed from the infos.
* @throws IOException When there is an error computing segment metadata from the store.
*/
ReplicationCheckpoint computeReplicationCheckpoint(SegmentInfos segmentInfos) throws IOException {
if (segmentInfos == null) {
return ReplicationCheckpoint.empty(shardId);
}
final ReplicationCheckpoint latestReplicationCheckpoint = getLatestReplicationCheckpoint();
if (latestReplicationCheckpoint.getSegmentInfosVersion() == segmentInfos.getVersion()
&& latestReplicationCheckpoint.getSegmentsGen() == segmentInfos.getGeneration()) {
return latestReplicationCheckpoint;
}
final Map<String, StoreFileMetadata> metadataMap = store.getSegmentMetadataMap(segmentInfos);
final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(
this.shardId,
getOperationPrimaryTerm(),
segmentInfos.getGeneration(),
segmentInfos.getVersion(),
metadataMap.values().stream().mapToLong(StoreFileMetadata::length).sum(),
getEngine().config().getCodec().getName(),
metadataMap
);
logger.trace("Recomputed ReplicationCheckpoint for shard {}", checkpoint);
return checkpoint;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ private boolean syncSegments() {
// in the remote store.
return indexShard.state() != IndexShardState.STARTED || !(indexShard.getEngine() instanceof InternalEngine);
}
ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint();
beforeSegmentsSync();
long refreshTimeMs = segmentTracker.getLocalRefreshTimeMs(), refreshClockTimeMs = segmentTracker.getLocalRefreshClockTimeMs();
long refreshSeqNo = segmentTracker.getLocalRefreshSeqNo();
Expand All @@ -199,10 +198,7 @@ private boolean syncSegments() {

try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
assert segmentInfos.getGeneration() == checkpoint.getSegmentsGen() : "SegmentInfos generation: "
+ segmentInfos.getGeneration()
+ " does not match metadata generation: "
+ checkpoint.getSegmentsGen();
final ReplicationCheckpoint checkpoint = indexShard.computeReplicationCheckpoint(segmentInfos);
// Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can
// move.
long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public DateTime(DateFormatter formatter, ZoneId timeZone, DateFieldMapper.Resolu
}

public DateTime(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
if (in.getVersion().onOrAfter(Version.V_2_12_0)) {
this.formatter = DateFormatter.forPattern(in.readString(), in.readOptionalString());
} else {
this.formatter = DateFormatter.forPattern(in.readString());
Expand All @@ -265,12 +265,12 @@ public String getWriteableName() {

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().before(Version.V_3_0_0) && formatter.equals(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER)) {
if (out.getVersion().before(Version.V_2_12_0) && formatter.equals(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER)) {
out.writeString(DateFieldMapper.LEGACY_DEFAULT_DATE_TIME_FORMATTER.pattern()); // required for backwards compatibility
} else {
out.writeString(formatter.pattern());
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
if (out.getVersion().onOrAfter(Version.V_2_12_0)) {
out.writeOptionalString(formatter.printPattern());
}
out.writeString(timeZone.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,8 +520,8 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
if (counter.incrementAndGet() <= succeedOnAttempt) {
throw new RuntimeException("Inducing failure in upload");
}
return indexShard.getLatestSegmentInfosAndCheckpoint();
})).when(shard).getLatestSegmentInfosAndCheckpoint();
return indexShard.getLatestReplicationCheckpoint();
})).when(shard).computeReplicationCheckpoint(any());

doAnswer(invocation -> {
if (Objects.nonNull(successLatch)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,33 @@ public void testSnapshotWhileFailoverIncomplete() throws Exception {
}
}

public void testReuseReplicationCheckpointWhenLatestInfosIsUnChanged() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, indexMapping, new NRTReplicationEngineFactory(), createTempDir())) {
final IndexShard primaryShard = shards.getPrimary();
shards.startAll();
shards.indexDocs(10);
shards.refresh("test");
replicateSegments(primaryShard, shards.getReplicas());
shards.assertAllEqual(10);
final ReplicationCheckpoint latestReplicationCheckpoint = primaryShard.getLatestReplicationCheckpoint();
try (GatedCloseable<SegmentInfos> segmentInfosSnapshot = primaryShard.getSegmentInfosSnapshot()) {
assertEquals(latestReplicationCheckpoint, primaryShard.computeReplicationCheckpoint(segmentInfosSnapshot.get()));
}
final Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> latestSegmentInfosAndCheckpoint = primaryShard
.getLatestSegmentInfosAndCheckpoint();
try (final GatedCloseable<SegmentInfos> closeable = latestSegmentInfosAndCheckpoint.v1()) {
assertEquals(latestReplicationCheckpoint, primaryShard.computeReplicationCheckpoint(closeable.get()));
}
}
}

public void testComputeReplicationCheckpointNullInfosReturnsEmptyCheckpoint() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, indexMapping, new NRTReplicationEngineFactory(), createTempDir())) {
final IndexShard primaryShard = shards.getPrimary();
assertEquals(ReplicationCheckpoint.empty(primaryShard.shardId), primaryShard.computeReplicationCheckpoint(null));
}
}

private SnapshotShardsService getSnapshotShardsService(IndexShard replicaShard) {
final TransportService transportService = mock(TransportService.class);
when(transportService.getThreadPool()).thenReturn(threadPool);
Expand Down

0 comments on commit f98e3a2

Please sign in to comment.