Skip to content

Commit

Permalink
[Remote Store] Fix tests when we restore index without any refresh (o…
Browse files Browse the repository at this point in the history
…pensearch-project#9480)

Signed-off-by: Sachin Kale <[email protected]>
Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
sachinpkale authored and shiv0408 committed Apr 25, 2024
1 parent a57a1d2 commit cd9f414
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ protected Map<String, Long> indexData(int numberOfIterations, boolean invokeFlus
Map<String, Long> indexingStats = new HashMap<>();
for (int i = 0; i < numberOfIterations; i++) {
if (invokeFlush) {
flush(index);
flushAndRefresh(index);
} else {
refresh(index);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public class RemoteStoreRestoreIT extends RemoteStoreBaseIntegTestCase {
private static final String TOTAL_OPERATIONS = "total-operations";
private static final String REFRESHED_OR_FLUSHED_OPERATIONS = "refreshed-or-flushed-operations";
private static final String MAX_SEQ_NO_TOTAL = "max-seq-no-total";
private static final String MAX_SEQ_NO_REFRESHED_OR_FLUSHED = "max-seq-no-refreshed-or-flushed";

@Override
public Settings indexSettings() {
Expand Down Expand Up @@ -68,18 +67,18 @@ private void restore(String... indices) {
);
}

private void verifyRestoredData(Map<String, Long> indexStats, boolean checkTotal, String indexName) {
private void verifyRestoredData(Map<String, Long> indexStats, String indexName) {
// This is required to get updated number from already active shards which were not restored
refresh(indexName);
String statsGranularity = checkTotal ? TOTAL_OPERATIONS : REFRESHED_OR_FLUSHED_OPERATIONS;
String maxSeqNoGranularity = checkTotal ? MAX_SEQ_NO_TOTAL : MAX_SEQ_NO_REFRESHED_OR_FLUSHED;
ensureYellowAndNoInitializingShards(indexName);
ensureGreen(indexName);
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(statsGranularity));
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS));
IndexResponse response = indexSingleDoc(indexName);
assertEquals(indexStats.get(maxSeqNoGranularity + "-shard-" + response.getShardId().id()) + 1, response.getSeqNo());
if (indexStats.containsKey(MAX_SEQ_NO_TOTAL + "-shard-" + response.getShardId().id())) {
assertEquals(indexStats.get(MAX_SEQ_NO_TOTAL + "-shard-" + response.getShardId().id()) + 1, response.getSeqNo());
}
refresh(indexName);
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(statsGranularity) + 1);
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS) + 1);
}

private void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
Expand All @@ -96,7 +95,6 @@ private void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, St
* Simulates all data restored using Remote Translog Store.
* @throws IOException IO Exception.
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6188")
public void testRemoteTranslogRestoreWithNoDataPostCommit() throws IOException {
testRestoreFlow(1, true, randomIntBetween(1, 5));
}
Expand Down Expand Up @@ -131,7 +129,6 @@ public void testRemoteTranslogRestoreWithCommittedData() throws IOException {
* Simulates all data restored using Remote Translog Store.
* @throws IOException IO Exception.
*/
// @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6188")
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8479")
public void testRTSRestoreWithNoDataPostCommitPrimaryReplicaDown() throws IOException {
testRestoreFlowBothPrimaryReplicasDown(1, true, randomIntBetween(1, 5));
Expand Down Expand Up @@ -172,7 +169,7 @@ private void restoreAndVerify(int shardCount, int replicaCount, Map<String, Long
// This is required to get updated number from already active shards which were not restored
assertEquals(shardCount * (1 + replicaCount), getNumShards(INDEX_NAME).totalNumShards);
assertEquals(replicaCount, getNumShards(INDEX_NAME).numReplicas);
verifyRestoredData(indexStats, true, INDEX_NAME);
verifyRestoredData(indexStats, INDEX_NAME);
}

/**
Expand All @@ -186,6 +183,8 @@ private void testRestoreFlow(int numberOfIterations, boolean invokeFlush, int sh
Map<String, Long> indexStats = indexData(numberOfIterations, invokeFlush, INDEX_NAME);
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);

assertHitCount(client().prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(REFRESHED_OR_FLUSHED_OPERATIONS));

internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME)));
ensureRed(INDEX_NAME);

Expand Down Expand Up @@ -256,7 +255,7 @@ private void testRestoreFlowMultipleIndices(int numberOfIterations, boolean invo
ensureGreen(indices);
for (String index : indices) {
assertEquals(shardCount, getNumShards(index).totalNumShards);
verifyRestoredData(indicesStats.get(index), true, index);
verifyRestoredData(indicesStats.get(index), index);
}
}

Expand Down Expand Up @@ -288,7 +287,7 @@ public void testRestoreFlowNoRedIndex() {

ensureGreen(INDEX_NAME);
assertEquals(shardCount, getNumShards(INDEX_NAME).totalNumShards);
verifyRestoredData(indexStats, true, INDEX_NAME);
verifyRestoredData(indexStats, INDEX_NAME);
}

/**
Expand Down Expand Up @@ -340,7 +339,7 @@ public void testRTSRestoreWithCommittedDataDefaultAllIndices() throws IOExceptio

for (String index : indices) {
assertEquals(shardCount, getNumShards(index).totalNumShards);
verifyRestoredData(indicesStats.get(index), true, index);
verifyRestoredData(indicesStats.get(index), index);
}
}

Expand Down Expand Up @@ -384,9 +383,9 @@ public void testRTSRestoreWithCommittedDataNotAllRedRemoteIndices() throws IOExc
);
ensureGreen(indices[0], indices[1]);
assertEquals(shardCount, getNumShards(indices[0]).totalNumShards);
verifyRestoredData(indicesStats.get(indices[0]), true, indices[0]);
verifyRestoredData(indicesStats.get(indices[0]), indices[0]);
assertEquals(shardCount, getNumShards(indices[1]).totalNumShards);
verifyRestoredData(indicesStats.get(indices[1]), true, indices[1]);
verifyRestoredData(indicesStats.get(indices[1]), indices[1]);
ensureRed(indices[2], indices[3]);
}

Expand Down Expand Up @@ -436,9 +435,9 @@ public void testRTSRestoreWithCommittedDataExcludeIndicesPatterns() throws IOExc
);
ensureGreen(indices[0], indices[1]);
assertEquals(shardCount, getNumShards(indices[0]).totalNumShards);
verifyRestoredData(indicesStats.get(indices[0]), true, indices[0]);
verifyRestoredData(indicesStats.get(indices[0]), indices[0]);
assertEquals(shardCount, getNumShards(indices[1]).totalNumShards);
verifyRestoredData(indicesStats.get(indices[1]), true, indices[1]);
verifyRestoredData(indicesStats.get(indices[1]), indices[1]);
ensureRed(indices[2], indices[3]);
}

Expand All @@ -447,8 +446,7 @@ public void testRTSRestoreWithCommittedDataExcludeIndicesPatterns() throws IOExc
* when the index has no data.
* @throws IOException IO Exception.
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6188")
public void testRTSRestoreNoData() throws IOException {
public void testRTSRestoreDataOnlyInTranslog() throws IOException {
testRestoreFlow(0, true, randomIntBetween(1, 5));
}

Expand Down
20 changes: 14 additions & 6 deletions server/src/main/java/org/opensearch/index/shard/StoreRecovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Checkpoint;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogHeader;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.repositories.IndexId;
Expand All @@ -74,6 +76,8 @@

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand All @@ -83,6 +87,7 @@
import java.util.stream.Collectors;

import static org.opensearch.common.unit.TimeValue.timeValueMillis;
import static org.opensearch.index.translog.Translog.CHECKPOINT_FILE_NAME;

/**
* This package private utility class encapsulates the logic to recover an index shard from either an existing index on
Expand Down Expand Up @@ -532,13 +537,16 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco
// Download segments from remote segment store
indexShard.syncSegmentsFromRemoteSegmentStore(true, true);

indexShard.syncTranslogFilesFromRemoteTranslog();

if (store.directory().listAll().length == 0) {
store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion);
}
if (indexShard.indexSettings.isRemoteTranslogStoreEnabled()) {
indexShard.syncTranslogFilesFromRemoteTranslog();
} else {
bootstrap(indexShard, store);
Path location = indexShard.shardPath().resolveTranslog();
Checkpoint checkpoint = Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME));
final Path translogFile = location.resolve(Translog.getFilename(checkpoint.getGeneration()));
try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.READ)) {
TranslogHeader translogHeader = TranslogHeader.read(translogFile, channel);
store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion, translogHeader.getTranslogUUID());
}
}

assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
Expand Down
15 changes: 11 additions & 4 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -1747,13 +1747,13 @@ public void accept(ShardLock Lock) {}
};
}

/**
* creates an empty lucene index and a corresponding empty translog. Any existing data will be deleted.
*/
public void createEmpty(Version luceneVersion) throws IOException {
public void createEmpty(Version luceneVersion, String translogUUID) throws IOException {
metadataLock.writeLock().lock();
try (IndexWriter writer = newEmptyIndexWriter(directory, luceneVersion)) {
final Map<String, String> map = new HashMap<>();
if (translogUUID != null) {
map.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
}
map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(SequenceNumbers.NO_OPS_PERFORMED));
map.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(SequenceNumbers.NO_OPS_PERFORMED));
Expand All @@ -1764,6 +1764,13 @@ public void createEmpty(Version luceneVersion) throws IOException {
}
}

/**
* creates an empty lucene index and a corresponding empty translog. Any existing data will be deleted.
*/
public void createEmpty(Version luceneVersion) throws IOException {
createEmpty(luceneVersion, null);
}

/**
* Marks an existing lucene index with a new history uuid.
* This is used to make sure no existing shard will recovery from this index using ops based recovery.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
*
* @opensearch.internal
*/
final class TranslogHeader {
public final class TranslogHeader {
public static final String TRANSLOG_CODEC = "translog";

public static final int VERSION_CHECKSUMS = 1; // pre-2.0 - unsupported
Expand Down Expand Up @@ -137,9 +137,26 @@ static int readHeaderVersion(final Path path, final FileChannel channel, final S
}

/**
* Read a translog header from the given path and file channel
* Read a translog header from the given path and file channel and compare the given UUID
*/
static TranslogHeader read(final String translogUUID, final Path path, final FileChannel channel) throws IOException {
TranslogHeader translogHeader = read(path, channel);
// verify UUID only after checksum, to ensure that UUID is not corrupted
final BytesRef expectedUUID = new BytesRef(translogUUID);
final BytesRef actualUUID = new BytesRef(translogHeader.translogUUID);
if (actualUUID.bytesEquals(expectedUUID) == false) {
throw new TranslogCorruptedException(
path.toString(),
"expected shard UUID " + expectedUUID + " but got: " + actualUUID + " this translog file belongs to a different translog"
);
}
return translogHeader;
}

/**
* Read a translog header from the given path and file channel and compare the given UUID
*/
public static TranslogHeader read(final Path path, final FileChannel channel) throws IOException {
try {
// This input is intentionally not closed because closing it will close the FileChannel.
final BufferedChecksumStreamInput in = new BufferedChecksumStreamInput(
Expand Down Expand Up @@ -179,16 +196,7 @@ static TranslogHeader read(final String translogUUID, final Path path, final Fil
+ channel.position()
+ "]";

// verify UUID only after checksum, to ensure that UUID is not corrupted
final BytesRef expectedUUID = new BytesRef(translogUUID);
if (uuid.bytesEquals(expectedUUID) == false) {
throw new TranslogCorruptedException(
path.toString(),
"expected shard UUID " + expectedUUID + " but got: " + uuid + " this translog file belongs to a different translog"
);
}

return new TranslogHeader(translogUUID, primaryTerm, headerSizeInBytes);
return new TranslogHeader(uuid.utf8ToString(), primaryTerm, headerSizeInBytes);
} catch (EOFException e) {
throw new TranslogCorruptedException(path.toString(), "translog header truncated", e);
}
Expand Down
37 changes: 37 additions & 0 deletions server/src/test/java/org/opensearch/index/store/StoreTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.opensearch.index.seqno.ReplicationTracker;
import org.opensearch.index.seqno.RetentionLease;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
import org.opensearch.test.DummyShardLock;
Expand Down Expand Up @@ -1166,6 +1167,42 @@ public void testGetMetadataWithSegmentInfos() throws IOException {
store.close();
}

public void testCreateEmptyStore() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 1);
Store store = new Store(shardId, INDEX_SETTINGS, new NIOFSDirectory(createTempDir()), new DummyShardLock(shardId));
store.createEmpty(Version.LATEST);
SegmentInfos segmentInfos = Lucene.readSegmentInfos(store.directory());
assertFalse(segmentInfos.getUserData().containsKey(Translog.TRANSLOG_UUID_KEY));
testDefaultUserData(segmentInfos);
store.close();
}

public void testCreateEmptyStoreWithTranlogUUID() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 1);
Store store = new Store(shardId, INDEX_SETTINGS, new NIOFSDirectory(createTempDir()), new DummyShardLock(shardId));
store.createEmpty(Version.LATEST, "dummy-translog-UUID");
SegmentInfos segmentInfos = Lucene.readSegmentInfos(store.directory());
assertEquals("dummy-translog-UUID", segmentInfos.getUserData().get(Translog.TRANSLOG_UUID_KEY));
testDefaultUserData(segmentInfos);
store.close();
}

public void testCreateEmptyWithNullTranlogUUID() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 1);
Store store = new Store(shardId, INDEX_SETTINGS, new NIOFSDirectory(createTempDir()), new DummyShardLock(shardId));
store.createEmpty(Version.LATEST, null);
SegmentInfos segmentInfos = Lucene.readSegmentInfos(store.directory());
assertFalse(segmentInfos.getUserData().containsKey(Translog.TRANSLOG_UUID_KEY));
testDefaultUserData(segmentInfos);
store.close();
}

private void testDefaultUserData(SegmentInfos segmentInfos) {
assertEquals("-1", segmentInfos.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
assertEquals("-1", segmentInfos.getUserData().get(SequenceNumbers.MAX_SEQ_NO));
assertEquals("-1", segmentInfos.getUserData().get(Engine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID));
}

public void testGetSegmentMetadataMap() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 1);
Store store = new Store(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,49 @@ public void testHeaderWithoutPrimaryTerm() throws Exception {
});
}

public void testCurrentHeaderVersionWithoutUUIDComparison() throws Exception {
final String translogUUID = UUIDs.randomBase64UUID();
final TranslogHeader outHeader = new TranslogHeader(translogUUID, randomNonNegativeLong());
final long generation = randomNonNegativeLong();
final Path translogFile = createTempDir().resolve(Translog.getFilename(generation));
try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) {
outHeader.write(channel, true);
assertThat(outHeader.sizeInBytes(), equalTo((int) channel.position()));
}
try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.READ)) {
final TranslogHeader inHeader = TranslogHeader.read(translogFile, channel);
assertThat(inHeader.getTranslogUUID(), equalTo(translogUUID));
assertThat(inHeader.getPrimaryTerm(), equalTo(outHeader.getPrimaryTerm()));
assertThat(inHeader.sizeInBytes(), equalTo((int) channel.position()));
}

TestTranslog.corruptFile(logger, random(), translogFile, false);
final TranslogCorruptedException corruption = expectThrows(TranslogCorruptedException.class, () -> {
try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.READ)) {
final TranslogHeader translogHeader = TranslogHeader.read(translogFile, channel);
assertThat(
"version " + TranslogHeader.VERSION_CHECKPOINTS + " translog",
translogHeader.getPrimaryTerm(),
equalTo(SequenceNumbers.UNASSIGNED_PRIMARY_TERM)
);
throw new TranslogCorruptedException(translogFile.toString(), "adjusted translog version");
} catch (IllegalStateException e) {
// corruption corrupted the version byte making this look like a v2, v1 or v0 translog
assertThat(
"version " + TranslogHeader.VERSION_CHECKPOINTS + "-or-earlier translog",
e.getMessage(),
anyOf(
containsString("pre-2.0 translog found"),
containsString("pre-1.4 translog found"),
containsString("pre-6.3 translog found")
)
);
throw new TranslogCorruptedException(translogFile.toString(), "adjusted translog version", e);
}
});
assertThat(corruption.getMessage(), not(containsString("this translog file belongs to a different translog")));
}

static void writeHeaderWithoutTerm(FileChannel channel, String translogUUID) throws IOException {
final OutputStreamStreamOutput out = new OutputStreamStreamOutput(Channels.newOutputStream(channel));
CodecUtil.writeHeader(new OutputStreamDataOutput(out), TranslogHeader.TRANSLOG_CODEC, TranslogHeader.VERSION_CHECKPOINTS);
Expand Down

0 comments on commit cd9f414

Please sign in to comment.