Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Remote Translog] Trimming based on Remote Segment Store (#7383) #7699

Merged
merged 2 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
codecov:
require_ci_to_pass: yes

ignore:
- "test"

coverage:
precision: 2
round: down
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.junit.After;
import org.junit.Before;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;

import java.nio.file.Path;
import java.util.Collection;

import static java.util.Arrays.asList;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public class RemoteStoreBaseIT extends OpenSearchIntegTestCase {
protected static final String REPOSITORY_NAME = "test-remore-store-repo";
protected static final int SHARD_COUNT = 1;
protected static final int REPLICA_COUNT = 1;

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return asList(MockTransportService.TestPlugin.class);
}

@Override
protected boolean addMockInternalEngine() {
return false;
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build();
}

public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s")
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
}

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
Path absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);
}

@After
public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.opensearch.remotestore;

import org.junit.After;
import org.junit.Before;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.index.IndexResponse;
Expand All @@ -18,17 +16,13 @@
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -40,9 +34,8 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreIT extends OpenSearchIntegTestCase {
public class RemoteStoreIT extends RemoteStoreBaseIT {

private static final String REPOSITORY_NAME = "test-remore-store-repo";
private static final String INDEX_NAME = "remote-store-test-idx-1";
private static final String TOTAL_OPERATIONS = "total-operations";
private static final String REFRESHED_OR_FLUSHED_OPERATIONS = "refreshed-or-flushed-operations";
Expand All @@ -62,13 +55,8 @@ public Settings indexSettings() {
private Settings remoteStoreIndexSettings(int numberOfReplicas) {
return Settings.builder()
.put(super.indexSettings())
.put("index.refresh_interval", "300s")
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}

Expand All @@ -80,30 +68,6 @@ private Settings remoteTranslogIndexSettings(int numberOfReplicas) {
.build();
}

@Override
protected boolean addMockInternalEngine() {
return false;
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build();
}

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
Path absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);
}

@After
public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}

private IndexResponse indexSingleDoc() {
return client().prepareIndex(INDEX_NAME)
.setId(UUIDs.randomBase64UUID())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.opensearch.action.admin.indices.close.CloseIndexResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Locale;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0)
public class ReplicaToPrimaryPromotionIT extends RemoteStoreBaseIT {
private int shard_count = 5;

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, shard_count)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}

public void testPromoteReplicaToPrimary() throws Exception {
internalCluster().startNode();
internalCluster().startNode();
final String indexName = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
shard_count = scaledRandomIntBetween(1, 5);
createIndex(indexName);
int numOfDocs = 0;
int numIter = scaledRandomIntBetween(0, 10);
for (int i = 0; i < numIter; i++) {
final int numOfDoc = scaledRandomIntBetween(0, 200);
logger.info("num of docs in iter {} {}", numOfDoc, i);
if (numOfDoc > 0) {
try (
BackgroundIndexer indexer = new BackgroundIndexer(
indexName,
"_doc",
client(),
numOfDoc,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
null
)
) {
indexer.setUseAutoGeneratedIDs(true);
indexer.start(numOfDoc);
waitForIndexed(numOfDoc, indexer);
numOfDocs += numOfDoc;
indexer.stopAndAwaitStopped();
if (random().nextBoolean()) {
// 90% refresh + 10% flush
if (random().nextInt(10) != 0) {
refresh(indexName);
} else {
flush(indexName);
}
}
}
}
}

ensureGreen(indexName);

// sometimes test with a closed index
final IndexMetadata.State indexState = randomFrom(IndexMetadata.State.OPEN, IndexMetadata.State.CLOSE);
if (indexState == IndexMetadata.State.CLOSE) {
CloseIndexResponse closeIndexResponse = client().admin().indices().prepareClose(indexName).get();
assertThat("close index not acked - " + closeIndexResponse, closeIndexResponse.isAcknowledged(), equalTo(true));
ensureGreen(indexName);
}

// pick up a data node that contains a random primary shard
ClusterState state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
final int numShards = state.metadata().index(indexName).getNumberOfShards();
final ShardRouting primaryShard = state.routingTable().index(indexName).shard(randomIntBetween(0, numShards - 1)).primaryShard();
final DiscoveryNode randomNode = state.nodes().resolveNode(primaryShard.currentNodeId());

// stop the random data node, all remaining shards are promoted to primaries
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(randomNode.getName()));
ensureYellowAndNoInitializingShards(indexName);

state = client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
for (IndexShardRoutingTable shardRoutingTable : state.routingTable().index(indexName)) {
for (ShardRouting shardRouting : shardRoutingTable.activeShards()) {
assertThat(shardRouting + " should be promoted as a primary", shardRouting.primary(), is(true));
}
}

if (indexState == IndexMetadata.State.CLOSE) {
assertAcked(client().admin().indices().prepareOpen(indexName));
ensureYellowAndNoInitializingShards(indexName);
}
refresh(indexName);
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), numOfDocs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.index.translog;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.SetOnce;
import org.opensearch.common.io.FileSystemUtils;
import org.opensearch.common.lease.Releasable;
Expand Down Expand Up @@ -47,6 +49,7 @@
*/
public class RemoteFsTranslog extends Translog {

private static final Logger logger = LogManager.getLogger(RemoteFsTranslog.class);
private final BlobStoreRepository blobStoreRepository;
private final TranslogTransferManager translogTransferManager;
private final FileTransferTracker fileTransferTracker;
Expand Down Expand Up @@ -82,7 +85,6 @@ public RemoteFsTranslog(
this.primaryModeSupplier = primaryModeSupplier;
fileTransferTracker = new FileTransferTracker(shardId);
this.translogTransferManager = buildTranslogTransferManager(blobStoreRepository, threadPool, shardId, fileTransferTracker);

try {
download(translogTransferManager, location);
Checkpoint checkpoint = readCheckpoint(location);
Expand Down Expand Up @@ -131,6 +133,7 @@ public static void download(Repository repository, ShardId shardId, ThreadPool t
}

public static void download(TranslogTransferManager translogTransferManager, Path location) throws IOException {
logger.info("Downloading translog files from remote for shard {} ", translogTransferManager.getShardId());
TranslogTransferMetadata translogMetadata = translogTransferManager.readMetadata();
if (translogMetadata != null) {
if (Files.notExists(location)) {
Expand All @@ -152,6 +155,7 @@ public static void download(TranslogTransferManager translogTransferManager, Pat
location.resolve(Translog.CHECKPOINT_FILE_NAME)
);
}
logger.info("Downloaded translog files from remote for shard {} ", translogTransferManager.getShardId());
}

public static TranslogTransferManager buildTranslogTransferManager(
Expand All @@ -161,6 +165,7 @@ public static TranslogTransferManager buildTranslogTransferManager(
FileTransferTracker fileTransferTracker
) {
return new TranslogTransferManager(
shardId,
new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool),
blobStoreRepository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())),
fileTransferTracker
Expand Down Expand Up @@ -331,8 +336,9 @@ protected long getMinReferencedGen() throws IOException {
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
long minReferencedGen = Math.min(
deletionPolicy.minTranslogGenRequired(readers, current),
minGenerationForSeqNo(Math.min(deletionPolicy.getLocalCheckpointOfSafeCommit() + 1, minSeqNoToKeep), current, readers)
minGenerationForSeqNo(minSeqNoToKeep, current, readers)
);

assert minReferencedGen >= getMinFileGeneration() : "deletion policy requires a minReferenceGen of ["
+ minReferencedGen
+ "] but the lowest gen available is ["
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,8 @@ TranslogWriter createWriter(
primaryTermSupplier.getAsLong(),
tragedy,
persistedSequenceNumberConsumer,
bigArrays
bigArrays,
indexSettings.isRemoteTranslogStoreEnabled()
);
} catch (final IOException e) {
throw new TranslogException(shardId, "failed to create new translog file", e);
Expand Down Expand Up @@ -2025,7 +2026,8 @@ public static String createEmptyTranslog(
seqNo -> {
throw new UnsupportedOperationException();
},
BigArrays.NON_RECYCLING_INSTANCE
BigArrays.NON_RECYCLING_INSTANCE,
null
);
writer.close();
return uuid;
Expand Down
Loading