Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Snapshot Interop] Add Logic in Lock Manager to cleanup stale data po… #8472

Merged
merged 1 commit into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_2_NAME));
}

public int getFileCount(Path path) throws Exception {
public static int getFileCount(Path path) throws Exception {
final AtomicInteger filesExisting = new AtomicInteger(0);
Files.walkFileTree(path, new SimpleFileVisitor<>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,28 @@

import org.opensearch.action.ActionFuture;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.test.FeatureFlagSetter;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static org.hamcrest.Matchers.is;
import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
import static org.hamcrest.Matchers.comparesEqualTo;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
Expand Down Expand Up @@ -240,12 +246,13 @@ public void testDeleteMultipleShallowCopySnapshotsCase3() throws Exception {
final String snapshotRepoName = "snapshot-repo-name";
final Path snapshotRepoPath = randomRepoPath();
createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowCopy(snapshotRepoPath));
final String testIndex = "index-test";
createIndexWithContent(testIndex);

final Path remoteStoreRepoPath = randomRepoPath();
createRepository(REMOTE_REPO_NAME, "fs", remoteStoreRepoPath);

final String testIndex = "index-test";
createIndexWithContent(testIndex);

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings();
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
Expand Down Expand Up @@ -289,6 +296,71 @@ public void testDeleteMultipleShallowCopySnapshotsCase3() throws Exception {
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, REMOTE_REPO_NAME).length == 0);
}

public void testRemoteStoreCleanupForDeletedIndex() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);

internalCluster().startClusterManagerOnlyNode(remoteStoreClusterSettings(REMOTE_REPO_NAME));
internalCluster().startDataOnlyNode();
final Client clusterManagerClient = internalCluster().clusterManagerClient();
ensureStableCluster(2);

final String snapshotRepoName = "snapshot-repo-name";
final Path snapshotRepoPath = randomRepoPath();
createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowCopy(snapshotRepoPath));

final Path remoteStoreRepoPath = randomRepoPath();
createRepository(REMOTE_REPO_NAME, "fs", remoteStoreRepoPath);

final String testIndex = "index-test";
createIndexWithContent(testIndex);

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings();
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10));

String indexUUID = client().admin()
.indices()
.prepareGetSettings(remoteStoreEnabledIndexName)
.get()
.getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_INDEX_UUID);

logger.info("--> create two remote index shallow snapshots");
List<String> shallowCopySnapshots = createNSnapshots(snapshotRepoName, 2);

String[] lockFiles = getLockFilesInRemoteStore(remoteStoreEnabledIndexName, REMOTE_REPO_NAME);
assert (lockFiles.length == 2) : "lock files are " + Arrays.toString(lockFiles);

// delete remote store index
assertAcked(client().admin().indices().prepareDelete(remoteStoreEnabledIndexName));

logger.info("--> delete snapshot 1");
AcknowledgedResponse deleteSnapshotResponse = clusterManagerClient.admin()
.cluster()
.prepareDeleteSnapshot(snapshotRepoName, shallowCopySnapshots.get(0))
.get();
assertAcked(deleteSnapshotResponse);

lockFiles = getLockFilesInRemoteStore(remoteStoreEnabledIndexName, REMOTE_REPO_NAME, indexUUID);
assert (lockFiles.length == 1) : "lock files are " + Arrays.toString(lockFiles);

logger.info("--> delete snapshot 2");
deleteSnapshotResponse = clusterManagerClient.admin()
.cluster()
.prepareDeleteSnapshot(snapshotRepoName, shallowCopySnapshots.get(1))
.get();
assertAcked(deleteSnapshotResponse);

Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID);
// Delete is async. Give time for it
assertBusy(() -> {
try {
assertThat(RemoteStoreBaseIntegTestCase.getFileCount(indexPath), comparesEqualTo(0));
} catch (Exception e) {}
}, 30, TimeUnit.SECONDS);
}

private List<String> createNSnapshots(String repoName, int count) {
final List<String> snapshotNames = new ArrayList<>(count);
final String prefix = "snap-" + UUIDs.randomBase64UUID(random()).toLowerCase(Locale.ROOT) + "-";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -837,29 +837,36 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
}
}

public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep) {
deleteStaleSegmentsAsync(lastNMetadataFilesToKeep, ActionListener.wrap(r -> {}, e -> {}));
}

/**
* Delete stale segment and metadata files asynchronously.
* This method calls {@link RemoteSegmentStoreDirectory#deleteStaleSegments(int)} in an async manner.
* @param lastNMetadataFilesToKeep number of metadata files to keep
*/
public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep) {
public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep, ActionListener<Void> listener) {
if (canDeleteStaleCommits.compareAndSet(true, false)) {
try {
threadPool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
try {
deleteStaleSegments(lastNMetadataFilesToKeep);
listener.onResponse(null);
} catch (Exception e) {
logger.info(
logger.error(
"Exception while deleting stale commits from remote segment store, will retry delete post next commit",
e
);
listener.onFailure(e);
} finally {
canDeleteStaleCommits.set(true);
}
});
} catch (Exception e) {
logger.info("Exception occurred while scheduling deleteStaleCommits", e);
logger.error("Exception occurred while scheduling deleteStaleCommits", e);
canDeleteStaleCommits.set(true);
listener.onFailure(e);
}
}
}
Expand Down Expand Up @@ -891,7 +898,6 @@ private boolean deleteIfEmpty() throws IOException {
}

public void close() throws IOException {
deleteStaleSegmentsAsync(0);
deleteIfEmpty();
deleteStaleSegmentsAsync(0, ActionListener.wrap(r -> deleteIfEmpty(), e -> logger.error("Failed to cleanup remote directory")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -59,7 +59,7 @@ public Directory newDirectory(String repositoryName, String indexUUID, String sh

RemoteDirectory dataDirectory = createRemoteDirectory(repository, commonBlobPath, "data");
RemoteDirectory metadataDirectory = createRemoteDirectory(repository, commonBlobPath, "metadata");
RemoteStoreMetadataLockManager mdLockManager = RemoteStoreLockManagerFactory.newLockManager(
RemoteStoreLockManager mdLockManager = RemoteStoreLockManagerFactory.newLockManager(
repositoriesService.get(),
repositoryName,
indexUUID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public RemoteStoreLockManagerFactory(Supplier<RepositoriesService> repositoriesS
this.repositoriesService = repositoriesService;
}

public RemoteStoreMetadataLockManager newLockManager(String repositoryName, String indexUUID, String shardId) throws IOException {
public RemoteStoreLockManager newLockManager(String repositoryName, String indexUUID, String shardId) throws IOException {
return newLockManager(repositoriesService.get(), repositoryName, indexUUID, shardId);
}

Expand All @@ -58,6 +58,12 @@ public static RemoteStoreMetadataLockManager newLockManager(
}
}

// TODO: remove this once we add poller in place to trigger remote store cleanup
// see: https://github.com/opensearch-project/OpenSearch/issues/8469
public Supplier<RepositoriesService> getRepositoriesService() {
return repositoriesService;
}

private static RemoteBufferedOutputDirectory createRemoteBufferedOutputDirectory(
Repository repository,
BlobPath commonBlobPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,12 @@
import org.opensearch.index.snapshots.blobstore.RateLimitingInputStream;
import org.opensearch.index.snapshots.blobstore.SlicedInputStream;
import org.opensearch.index.snapshots.blobstore.SnapshotFiles;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.store.lockmanager.FileLockInfo;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.repositories.IndexId;
Expand Down Expand Up @@ -616,7 +617,7 @@ public void cloneRemoteStoreIndexShardSnapshot(
RemoteStoreShardShallowCopySnapshot remStoreBasedShardMetadata = (RemoteStoreShardShallowCopySnapshot) indexShardSnapshot;
String indexUUID = remStoreBasedShardMetadata.getIndexUUID();
String remoteStoreRepository = remStoreBasedShardMetadata.getRemoteStoreRepository();
RemoteStoreMetadataLockManager remoteStoreMetadataLockManger = remoteStoreLockManagerFactory.newLockManager(
RemoteStoreLockManager remoteStoreMetadataLockManger = remoteStoreLockManagerFactory.newLockManager(
remoteStoreRepository,
indexUUID,
String.valueOf(shardId.shardId())
Expand Down Expand Up @@ -1072,11 +1073,24 @@ private void executeStaleShardDelete(
// Releasing lock file before deleting the shallow-snap-UUID file because in case of any failure while
// releasing the lock file, we would still have the shallow-snap-UUID file and that would be used during
// next delete operation for releasing this lock file
RemoteStoreMetadataLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory
.newLockManager(remoteStoreRepoForIndex, indexUUID, shardId);
RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager(
remoteStoreRepoForIndex,
indexUUID,
shardId
);
remoteStoreMetadataLockManager.release(
FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID).build()
);
if (!isIndexPresent(clusterService, indexUUID)) {
// this is a temporary solution where snapshot deletion triggers remote store side
// cleanup if index is already deleted. We will add a poller in future to take
// care of remote store side cleanup.
// see https://github.com/opensearch-project/OpenSearch/issues/8469
new RemoteSegmentStoreDirectoryFactory(
remoteStoreLockManagerFactory.getRepositoriesService(),
threadPool
).newDirectory(remoteStoreRepoForIndex, indexUUID, shardId).close();
}
}
}
}
Expand Down Expand Up @@ -1487,6 +1501,15 @@ private void cleanupStaleIndices(
}
}

private static boolean isIndexPresent(ClusterService clusterService, String indexUUID) {
for (final IndexMetadata indexMetadata : clusterService.state().metadata().getIndices().values()) {
harishbhakuni marked this conversation as resolved.
Show resolved Hide resolved
if (indexUUID.equals(indexMetadata.getIndexUUID())) {
return true;
}
}
return false;
}

private void executeOneStaleIndexDelete(
BlockingQueue<Map.Entry<String, BlobContainer>> staleIndicesToDelete,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
Expand Down Expand Up @@ -1519,11 +1542,21 @@ private void executeOneStaleIndexDelete(
// Releasing lock files before deleting the shallow-snap-UUID file because in case of any failure
// while releasing the lock file, we would still have the corresponding shallow-snap-UUID file
// and that would be used during next delete operation for releasing this stale lock file
RemoteStoreMetadataLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory
RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory
.newLockManager(remoteStoreRepoForIndex, indexUUID, shardBlob.getKey());
remoteStoreMetadataLockManager.release(
FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID).build()
);
if (!isIndexPresent(clusterService, indexUUID)) {
// this is a temporary solution where snapshot deletion triggers remote store side
// cleanup if index is already deleted. We will add a poller in future to take
// care of remote store side cleanup.
// see https://github.com/opensearch-project/OpenSearch/issues/8469
new RemoteSegmentStoreDirectoryFactory(
remoteStoreLockManagerFactory.getRepositoriesService(),
threadPool
).newDirectory(remoteStoreRepoForIndex, indexUUID, shardBlob.getKey()).close();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,6 @@ public void testRetrieveSnapshots() throws Exception {
assertThat(snapshotIds, equalTo(originalSnapshots));
}

// Validate Scenario remoteStoreShallowCopy Snapshot -> remoteStoreShallowCopy Snapshot
// -> remoteStoreShallowCopy Snapshot -> normal snapshot
public void testReadAndWriteSnapshotsThroughIndexFile() throws Exception {
final BlobStoreRepository repository = setupRepo();
final long pendingGeneration = repository.metadata.pendingGeneration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,11 @@ protected String[] getLockFilesInRemoteStore(String remoteStoreIndex, String rem
.prepareGetSettings(remoteStoreIndex)
.get()
.getSetting(remoteStoreIndex, IndexMetadata.SETTING_INDEX_UUID);
return getLockFilesInRemoteStore(remoteStoreIndex, remoteStoreRepositoryName, indexUUID);
}

protected String[] getLockFilesInRemoteStore(String remoteStoreIndex, String remoteStoreRepositoryName, String indexUUID)
throws IOException {
final RepositoriesService repositoriesService = internalCluster().getCurrentClusterManagerNodeInstance(RepositoriesService.class);
final BlobStoreRepository remoteStoreRepository = (BlobStoreRepository) repositoriesService.repository(remoteStoreRepositoryName);
BlobPath shardLevelBlobPath = remoteStoreRepository.basePath().add(indexUUID).add("0").add("segments").add("lock_files");
Expand Down