Skip to content

Commit

Permalink
[Backport 2.x] [Remote Translog] Trimming based on Remote Segment Sto…
Browse files Browse the repository at this point in the history
…re (#7383) (#7699)

* [Remote Translog] Trimming based on Remote Segment Store  (#7383)

* [Remote Translog] Set min referenced as per remote segment store only
when enabled. Add IT for failover using remote translog as well

Signed-off-by: Gaurav Bafna <[email protected]>
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 authored May 24, 2023
1 parent 7fda531 commit 636e772
Show file tree
Hide file tree
Showing 13 changed files with 307 additions and 63 deletions.
3 changes: 3 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
codecov:
require_ci_to_pass: yes

ignore:
- "test"

coverage:
precision: 2
round: down
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.junit.After;
import org.junit.Before;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;

import java.nio.file.Path;
import java.util.Collection;

import static java.util.Arrays.asList;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public class RemoteStoreBaseIT extends OpenSearchIntegTestCase {
protected static final String REPOSITORY_NAME = "test-remore-store-repo";
protected static final int SHARD_COUNT = 1;
protected static final int REPLICA_COUNT = 1;

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return asList(MockTransportService.TestPlugin.class);
}

@Override
protected boolean addMockInternalEngine() {
return false;
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build();
}

public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s")
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
}

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
Path absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);
}

@After
public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.opensearch.remotestore;

import org.junit.After;
import org.junit.Before;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.index.IndexResponse;
Expand All @@ -18,17 +16,13 @@
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -40,9 +34,8 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreIT extends OpenSearchIntegTestCase {
public class RemoteStoreIT extends RemoteStoreBaseIT {

private static final String REPOSITORY_NAME = "test-remore-store-repo";
private static final String INDEX_NAME = "remote-store-test-idx-1";
private static final String TOTAL_OPERATIONS = "total-operations";
private static final String REFRESHED_OR_FLUSHED_OPERATIONS = "refreshed-or-flushed-operations";
Expand All @@ -62,13 +55,8 @@ public Settings indexSettings() {
private Settings remoteStoreIndexSettings(int numberOfReplicas) {
return Settings.builder()
.put(super.indexSettings())
.put("index.refresh_interval", "300s")
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}

Expand All @@ -80,30 +68,6 @@ private Settings remoteTranslogIndexSettings(int numberOfReplicas) {
.build();
}

@Override
protected boolean addMockInternalEngine() {
return false;
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build();
}

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
Path absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);
}

@After
public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}

private IndexResponse indexSingleDoc() {
return client().prepareIndex(INDEX_NAME)
.setId(UUIDs.randomBase64UUID())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.opensearch.action.admin.indices.close.CloseIndexResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Locale;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0)
public class ReplicaToPrimaryPromotionIT extends RemoteStoreBaseIT {
private int shard_count = 5;

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, shard_count)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}

public void testPromoteReplicaToPrimary() throws Exception {
internalCluster().startNode();
internalCluster().startNode();
final String indexName = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
shard_count = scaledRandomIntBetween(1, 5);
createIndex(indexName);
int numOfDocs = 0;
int numIter = scaledRandomIntBetween(0, 10);
for (int i = 0; i < numIter; i++) {
final int numOfDoc = scaledRandomIntBetween(0, 200);
logger.info("num of docs in iter {} {}", numOfDoc, i);
if (numOfDoc > 0) {
try (
BackgroundIndexer indexer = new BackgroundIndexer(
indexName,
"_doc",
client(),
numOfDoc,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
null
)
) {
indexer.setUseAutoGeneratedIDs(true);
indexer.start(numOfDoc);
waitForIndexed(numOfDoc, indexer);
numOfDocs += numOfDoc;
indexer.stopAndAwaitStopped();
if (random().nextBoolean()) {
// 90% refresh + 10% flush
if (random().nextInt(10) != 0) {
refresh(indexName);
} else {
flush(indexName);
}
}
}
}
}

ensureGreen(indexName);

// sometimes test with a closed index
final IndexMetadata.State indexState = randomFrom(IndexMetadata.State.OPEN, IndexMetadata.State.CLOSE);
if (indexState == IndexMetadata.State.CLOSE) {
CloseIndexResponse closeIndexResponse = client().admin().indices().prepareClose(indexName).get();
assertThat("close index not acked - " + closeIndexResponse, closeIndexResponse.isAcknowledged(), equalTo(true));
ensureGreen(indexName);
}

// pick up a data node that contains a random primary shard
ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
final int numShards = state.metadata().index(indexName).getNumberOfShards();
final ShardRouting primaryShard = state.routingTable().index(indexName).shard(randomIntBetween(0, numShards - 1)).primaryShard();
final DiscoveryNode randomNode = state.nodes().resolveNode(primaryShard.currentNodeId());

// stop the random data node, all remaining shards are promoted to primaries
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(randomNode.getName()));
ensureYellowAndNoInitializingShards(indexName);

state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
for (IndexShardRoutingTable shardRoutingTable : state.routingTable().index(indexName)) {
for (ShardRouting shardRouting : shardRoutingTable.activeShards()) {
assertThat(shardRouting + " should be promoted as a primary", shardRouting.primary(), is(true));
}
}

if (indexState == IndexMetadata.State.CLOSE) {
assertAcked(client().admin().indices().prepareOpen(indexName));
ensureYellowAndNoInitializingShards(indexName);
}
refresh(indexName);
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), numOfDocs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.index.translog;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.SetOnce;
import org.opensearch.common.io.FileSystemUtils;
import org.opensearch.common.lease.Releasable;
Expand Down Expand Up @@ -47,6 +49,7 @@
*/
public class RemoteFsTranslog extends Translog {

private static final Logger logger = LogManager.getLogger(RemoteFsTranslog.class);
private final BlobStoreRepository blobStoreRepository;
private final TranslogTransferManager translogTransferManager;
private final FileTransferTracker fileTransferTracker;
Expand Down Expand Up @@ -82,7 +85,6 @@ public RemoteFsTranslog(
this.primaryModeSupplier = primaryModeSupplier;
fileTransferTracker = new FileTransferTracker(shardId);
this.translogTransferManager = buildTranslogTransferManager(blobStoreRepository, threadPool, shardId, fileTransferTracker);

try {
download(translogTransferManager, location);
Checkpoint checkpoint = readCheckpoint(location);
Expand Down Expand Up @@ -131,6 +133,7 @@ public static void download(Repository repository, ShardId shardId, ThreadPool t
}

public static void download(TranslogTransferManager translogTransferManager, Path location) throws IOException {
logger.info("Downloading translog files from remote for shard {} ", translogTransferManager.getShardId());
TranslogTransferMetadata translogMetadata = translogTransferManager.readMetadata();
if (translogMetadata != null) {
if (Files.notExists(location)) {
Expand All @@ -152,6 +155,7 @@ public static void download(TranslogTransferManager translogTransferManager, Pat
location.resolve(Translog.CHECKPOINT_FILE_NAME)
);
}
logger.info("Downloaded translog files from remote for shard {} ", translogTransferManager.getShardId());
}

public static TranslogTransferManager buildTranslogTransferManager(
Expand All @@ -161,6 +165,7 @@ public static TranslogTransferManager buildTranslogTransferManager(
FileTransferTracker fileTransferTracker
) {
return new TranslogTransferManager(
shardId,
new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool),
blobStoreRepository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())),
fileTransferTracker
Expand Down Expand Up @@ -331,8 +336,9 @@ protected long getMinReferencedGen() throws IOException {
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
long minReferencedGen = Math.min(
deletionPolicy.minTranslogGenRequired(readers, current),
minGenerationForSeqNo(Math.min(deletionPolicy.getLocalCheckpointOfSafeCommit() + 1, minSeqNoToKeep), current, readers)
minGenerationForSeqNo(minSeqNoToKeep, current, readers)
);

assert minReferencedGen >= getMinFileGeneration() : "deletion policy requires a minReferenceGen of ["
+ minReferencedGen
+ "] but the lowest gen available is ["
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,8 @@ TranslogWriter createWriter(
primaryTermSupplier.getAsLong(),
tragedy,
persistedSequenceNumberConsumer,
bigArrays
bigArrays,
indexSettings.isRemoteTranslogStoreEnabled()
);
} catch (final IOException e) {
throw new TranslogException(shardId, "failed to create new translog file", e);
Expand Down Expand Up @@ -2025,7 +2026,8 @@ public static String createEmptyTranslog(
seqNo -> {
throw new UnsupportedOperationException();
},
BigArrays.NON_RECYCLING_INSTANCE
BigArrays.NON_RECYCLING_INSTANCE,
null
);
writer.close();
return uuid;
Expand Down
Loading

0 comments on commit 636e772

Please sign in to comment.