Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Add support to run SegRep integ tests using remote store settings #6192

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.junit.After;
import org.junit.Before;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.SegmentReplicationIT;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationRemoteStoreIT extends SegmentReplicationIT {

private static final String REPOSITORY_NAME = "test-remore-store-repo";

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build();
}

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
Path absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);
}

@After
public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ public ReadOnlyEngine(
}
if (seqNoStats == null) {
seqNoStats = buildSeqNoStats(config, lastCommittedSegmentInfos);
ensureMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats);
if (config.getIndexSettings().isRemoteTranslogStoreEnabled() == false) {
ensureMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats);
}
}
this.seqNoStats = seqNoStats;
this.indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, directory);
Expand Down Expand Up @@ -186,7 +188,7 @@ protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(final SeqNoStats seqNoStat
// In addition to that we only execute the check if the index the engine belongs to has been
// created after the refactoring of the Close Index API and its TransportVerifyShardBeforeCloseAction
// that guarantee that all operations have been flushed to Lucene.
assert assertMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats.getMaxSeqNo(), seqNoStats.getGlobalCheckpoint());
assertMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats.getMaxSeqNo(), seqNoStats.getGlobalCheckpoint());
if (seqNoStats.getMaxSeqNo() != seqNoStats.getGlobalCheckpoint()) {
throw new IllegalStateException(
"Maximum sequence number ["
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,8 +1093,12 @@ public long getAsLong() {
* @param reason the reason the global checkpoint was updated
*/
public synchronized void updateGlobalCheckpointOnReplica(final long newGlobalCheckpoint, final String reason) {
assert invariant();
assert primaryMode == false;
updateGlobalCheckpoint(newGlobalCheckpoint, reason);
}

public synchronized void updateGlobalCheckpoint(final long newGlobalCheckpoint, final String reason) {
assert invariant();
/*
* The global checkpoint here is a local knowledge which is updated under the mandate of the primary. It can happen that the primary
* information is lagging compared to a replica (e.g., if a replica is promoted to primary but has stale info relative to other
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3041,7 +3041,8 @@ public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final S
* while the global checkpoint update may have emanated from the primary when we were in that state, we could subsequently move
* to recovery finalization, or even finished recovery before the update arrives here.
*/
assert state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.STARTED
assert (state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.STARTED)
|| (indexSettings.isRemoteTranslogStoreEnabled() == true && state() != IndexShardState.RECOVERING)
: "supposedly in-sync shard copy received a global checkpoint ["
+ globalCheckpoint
+ "] "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.index.seqno.ReplicationTracker;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.index.translog.transfer.FileTransferTracker;
Expand Down Expand Up @@ -83,6 +84,9 @@ public RemoteFsTranslog(
try {
download(translogTransferManager, location);
Checkpoint checkpoint = readCheckpoint(location);
assert globalCheckpointSupplier instanceof ReplicationTracker
: "globalCheckpointSupplier is not instance of ReplicationTracker";
((ReplicationTracker) globalCheckpointSupplier).updateGlobalCheckpoint(checkpoint.globalCheckpoint, "RemoteFsTranslog init");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, this update should happen in IndexShard instead of RemoteFsTranslog. As part of recovery, we can download the translog files and update the replicationTracker's global checkpoint.

this.readers.addAll(recoverFromFiles(checkpoint));
if (readers.isEmpty()) {
throw new IllegalStateException("at least one reader must be recovered");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@
import org.opensearch.env.TestEnvironment;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.MissingHistoryOperationsException;
import org.opensearch.index.engine.SafeCommitInfo;
import org.opensearch.index.seqno.LocalCheckpointTracker;
import org.opensearch.index.seqno.LocalCheckpointTrackerTests;
import org.opensearch.index.seqno.ReplicationTracker;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
Expand Down Expand Up @@ -93,6 +95,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.opensearch.common.util.BigArrays.NON_RECYCLING_INSTANCE;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.opensearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder;
import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;

Expand All @@ -103,7 +106,7 @@ public class RemoteFSTranslogTests extends OpenSearchTestCase {
protected final ShardId shardId = new ShardId("index", "_na_", 1);

protected RemoteFsTranslog translog;
private AtomicLong globalCheckpoint;
private ReplicationTracker replicationTracker;
protected Path translogDir;
// A default primary term is used by translog instances created in this test.
private final AtomicLong primaryTerm = new AtomicLong();
Expand Down Expand Up @@ -155,7 +158,17 @@ private RemoteFsTranslog create(Path path) throws IOException {

private RemoteFsTranslog create(Path path, BlobStoreRepository repository, String translogUUID) throws IOException {
this.repository = repository;
globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
replicationTracker = new ReplicationTracker(
shardId,
translogUUID,
IndexSettingsModule.newIndexSettings("index", Settings.EMPTY),
primaryTerm.get(),
UNASSIGNED_SEQ_NO,
value -> {},
System::currentTimeMillis,
(leases, listener) -> {},
() -> SafeCommitInfo.EMPTY
);
final TranslogConfig translogConfig = getTranslogConfig(path);
final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings());
threadPool = new TestThreadPool(getClass().getName());
Expand All @@ -167,7 +180,7 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin
translogConfig,
translogUUID,
deletionPolicy,
() -> globalCheckpoint.get(),
replicationTracker,
primaryTerm::get,
getPersistedSeqNoConsumer(),
repository,
Expand Down Expand Up @@ -1160,7 +1173,7 @@ public int write(ByteBuffer src) throws IOException {
config,
translogUUID,
new DefaultTranslogDeletionPolicy(-1, -1, 0),
() -> SequenceNumbers.NO_OPS_PERFORMED,
replicationTracker,
primaryTerm::get,
persistedSeqNos::add,
repository,
Expand Down