Skip to content

Commit

Permalink
[Snapshot Interop] Add Logic in Lock Manager to cleanup stale data po…
Browse files Browse the repository at this point in the history
…st index deletion. (#8472) (#9769)

Signed-off-by: Harish Bhakuni <[email protected]>
(cherry picked from commit 5670d2a)
  • Loading branch information
harishbhakuni authored Sep 6, 2023
1 parent 60591ea commit affbd76
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_2_NAME));
}

public int getFileCount(Path path) throws Exception {
public static int getFileCount(Path path) throws Exception {
final AtomicInteger filesExisting = new AtomicInteger(0);
Files.walkFileTree(path, new SimpleFileVisitor<>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,29 @@
package org.opensearch.snapshots;

import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.UUIDs;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.test.FeatureFlagSetter;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.comparesEqualTo;
import static org.hamcrest.Matchers.is;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
Expand Down Expand Up @@ -240,12 +246,13 @@ public void testDeleteMultipleShallowCopySnapshotsCase3() throws Exception {
final String snapshotRepoName = "snapshot-repo-name";
final Path snapshotRepoPath = randomRepoPath();
createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowCopy(snapshotRepoPath));
final String testIndex = "index-test";
createIndexWithContent(testIndex);

final Path remoteStoreRepoPath = randomRepoPath();
createRepository(REMOTE_REPO_NAME, "fs", remoteStoreRepoPath);

final String testIndex = "index-test";
createIndexWithContent(testIndex);

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings();
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
Expand Down Expand Up @@ -289,6 +296,71 @@ public void testDeleteMultipleShallowCopySnapshotsCase3() throws Exception {
assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, REMOTE_REPO_NAME).length == 0);
}

public void testRemoteStoreCleanupForDeletedIndex() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE);

internalCluster().startClusterManagerOnlyNode(remoteStoreClusterSettings(REMOTE_REPO_NAME));
internalCluster().startDataOnlyNode();
final Client clusterManagerClient = internalCluster().clusterManagerClient();
ensureStableCluster(2);

final String snapshotRepoName = "snapshot-repo-name";
final Path snapshotRepoPath = randomRepoPath();
createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowCopy(snapshotRepoPath));

final Path remoteStoreRepoPath = randomRepoPath();
createRepository(REMOTE_REPO_NAME, "fs", remoteStoreRepoPath);

final String testIndex = "index-test";
createIndexWithContent(testIndex);

final String remoteStoreEnabledIndexName = "remote-index-1";
final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings();
createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings);
indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10));

String indexUUID = client().admin()
.indices()
.prepareGetSettings(remoteStoreEnabledIndexName)
.get()
.getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_INDEX_UUID);

logger.info("--> create two remote index shallow snapshots");
List<String> shallowCopySnapshots = createNSnapshots(snapshotRepoName, 2);

String[] lockFiles = getLockFilesInRemoteStore(remoteStoreEnabledIndexName, REMOTE_REPO_NAME);
assert (lockFiles.length == 2) : "lock files are " + Arrays.toString(lockFiles);

// delete the giremote store index
assertAcked(client().admin().indices().prepareDelete(remoteStoreEnabledIndexName));

logger.info("--> delete snapshot 1");
AcknowledgedResponse deleteSnapshotResponse = clusterManagerClient.admin()
.cluster()
.prepareDeleteSnapshot(snapshotRepoName, shallowCopySnapshots.get(0))
.get();
assertAcked(deleteSnapshotResponse);

lockFiles = getLockFilesInRemoteStore(remoteStoreEnabledIndexName, REMOTE_REPO_NAME, indexUUID);
assert (lockFiles.length == 1) : "lock files are " + Arrays.toString(lockFiles);

logger.info("--> delete snapshot 2");
deleteSnapshotResponse = clusterManagerClient.admin()
.cluster()
.prepareDeleteSnapshot(snapshotRepoName, shallowCopySnapshots.get(1))
.get();
assertAcked(deleteSnapshotResponse);

Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID);
// Delete is async. Give time for it
assertBusy(() -> {
try {
assertThat(RemoteStoreBaseIntegTestCase.getFileCount(indexPath), comparesEqualTo(0));
} catch (Exception e) {}
}, 30, TimeUnit.SECONDS);
}

private List<String> createNSnapshots(String repoName, int count) {
final List<String> snapshotNames = new ArrayList<>(count);
final String prefix = "snap-" + UUIDs.randomBase64UUID(random()).toLowerCase(Locale.ROOT) + "-";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,30 +760,37 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
}
}

public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep) {
deleteStaleSegmentsAsync(lastNMetadataFilesToKeep, ActionListener.wrap(r -> {}, e -> {}));
}

/**
* Delete stale segment and metadata files asynchronously.
* This method calls {@link RemoteSegmentStoreDirectory#deleteStaleSegments(int)} in an async manner.
*
* @param lastNMetadataFilesToKeep number of metadata files to keep
*/
public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep) {
public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep, ActionListener<Void> listener) {
if (canDeleteStaleCommits.compareAndSet(true, false)) {
try {
threadPool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
try {
deleteStaleSegments(lastNMetadataFilesToKeep);
listener.onResponse(null);
} catch (Exception e) {
logger.info(
logger.error(
"Exception while deleting stale commits from remote segment store, will retry delete post next commit",
e
);
listener.onFailure(e);
} finally {
canDeleteStaleCommits.set(true);
}
});
} catch (Exception e) {
logger.info("Exception occurred while scheduling deleteStaleCommits", e);
logger.error("Exception occurred while scheduling deleteStaleCommits", e);
canDeleteStaleCommits.set(true);
listener.onFailure(e);
}
}
}
Expand Down Expand Up @@ -815,7 +822,6 @@ private boolean deleteIfEmpty() throws IOException {
}

public void close() throws IOException {
deleteStaleSegmentsAsync(0);
deleteIfEmpty();
deleteStaleSegmentsAsync(0, ActionListener.wrap(r -> deleteIfEmpty(), e -> logger.error("Failed to cleanup remote directory")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public RemoteStoreLockManagerFactory(Supplier<RepositoriesService> repositoriesS
this.repositoriesService = repositoriesService;
}

public RemoteStoreMetadataLockManager newLockManager(String repositoryName, String indexUUID, String shardId) throws IOException {
public RemoteStoreLockManager newLockManager(String repositoryName, String indexUUID, String shardId) throws IOException {
return newLockManager(repositoriesService.get(), repositoryName, indexUUID, shardId);
}

Expand All @@ -58,6 +58,12 @@ public static RemoteStoreMetadataLockManager newLockManager(
}
}

// TODO: remove this once we add poller in place to trigger remote store cleanup
// see: https://github.com/opensearch-project/OpenSearch/issues/8469
public Supplier<RepositoriesService> getRepositoriesService() {
return repositoriesService;
}

private static RemoteBufferedOutputDirectory createRemoteBufferedOutputDirectory(
Repository repository,
BlobPath commonBlobPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,12 @@
import org.opensearch.index.snapshots.blobstore.RemoteStoreShardShallowCopySnapshot;
import org.opensearch.index.snapshots.blobstore.SlicedInputStream;
import org.opensearch.index.snapshots.blobstore.SnapshotFiles;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.store.lockmanager.FileLockInfo;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.repositories.IndexId;
Expand Down Expand Up @@ -633,7 +634,7 @@ public void cloneRemoteStoreIndexShardSnapshot(
RemoteStoreShardShallowCopySnapshot remStoreBasedShardMetadata = (RemoteStoreShardShallowCopySnapshot) indexShardSnapshot;
String indexUUID = remStoreBasedShardMetadata.getIndexUUID();
String remoteStoreRepository = remStoreBasedShardMetadata.getRemoteStoreRepository();
RemoteStoreMetadataLockManager remoteStoreMetadataLockManger = remoteStoreLockManagerFactory.newLockManager(
RemoteStoreLockManager remoteStoreMetadataLockManger = remoteStoreLockManagerFactory.newLockManager(
remoteStoreRepository,
indexUUID,
String.valueOf(shardId.shardId())
Expand Down Expand Up @@ -1159,11 +1160,24 @@ private void executeStaleShardDelete(
// Releasing lock file before deleting the shallow-snap-UUID file because in case of any failure while
// releasing the lock file, we would still have the shallow-snap-UUID file and that would be used during
// next delete operation for releasing this lock file
RemoteStoreMetadataLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory
.newLockManager(remoteStoreRepoForIndex, indexUUID, shardId);
RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager(
remoteStoreRepoForIndex,
indexUUID,
shardId
);
remoteStoreMetadataLockManager.release(
FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID).build()
);
if (!isIndexPresent(clusterService, indexUUID)) {
// this is a temporary solution where snapshot deletion triggers remote store side
// cleanup if index is already deleted. We will add a poller in future to take
// care of remote store side cleanup.
// see https://github.com/opensearch-project/OpenSearch/issues/8469
new RemoteSegmentStoreDirectoryFactory(
remoteStoreLockManagerFactory.getRepositoriesService(),
threadPool
).newDirectory(remoteStoreRepoForIndex, indexUUID, shardId).close();
}
}
}
}
Expand Down Expand Up @@ -1574,6 +1588,15 @@ private void cleanupStaleIndices(
}
}

private static boolean isIndexPresent(ClusterService clusterService, String indexUUID) {
for (final IndexMetadata indexMetadata : clusterService.state().metadata().getIndices().values()) {
if (indexUUID.equals(indexMetadata.getIndexUUID())) {
return true;
}
}
return false;
}

private void executeOneStaleIndexDelete(
BlockingQueue<Map.Entry<String, BlobContainer>> staleIndicesToDelete,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
Expand Down Expand Up @@ -1606,11 +1629,21 @@ private void executeOneStaleIndexDelete(
// Releasing lock files before deleting the shallow-snap-UUID file because in case of any failure
// while releasing the lock file, we would still have the corresponding shallow-snap-UUID file
// and that would be used during next delete operation for releasing this stale lock file
RemoteStoreMetadataLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory
RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory
.newLockManager(remoteStoreRepoForIndex, indexUUID, shardBlob.getKey());
remoteStoreMetadataLockManager.release(
FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID).build()
);
if (!isIndexPresent(clusterService, indexUUID)) {
// this is a temporary solution where snapshot deletion triggers remote store side
// cleanup if index is already deleted. We will add a poller in future to take
// care of remote store side cleanup.
// see https://github.com/opensearch-project/OpenSearch/issues/8469
new RemoteSegmentStoreDirectoryFactory(
remoteStoreLockManagerFactory.getRepositoriesService(),
threadPool
).newDirectory(remoteStoreRepoForIndex, indexUUID, shardBlob.getKey()).close();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,6 @@ public void testRetrieveSnapshots() throws Exception {
assertThat(snapshotIds, equalTo(originalSnapshots));
}

// Validate Scenario remoteStoreShallowCopy Snapshot -> remoteStoreShallowCopy Snapshot
// -> remoteStoreShallowCopy Snapshot -> normal snapshot
public void testReadAndWriteSnapshotsThroughIndexFile() throws Exception {
final BlobStoreRepository repository = setupRepo();
final long pendingGeneration = repository.metadata.pendingGeneration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,11 @@ protected String[] getLockFilesInRemoteStore(String remoteStoreIndex, String rem
.prepareGetSettings(remoteStoreIndex)
.get()
.getSetting(remoteStoreIndex, IndexMetadata.SETTING_INDEX_UUID);
return getLockFilesInRemoteStore(remoteStoreIndex, remoteStoreRepositoryName, indexUUID);
}

protected String[] getLockFilesInRemoteStore(String remoteStoreIndex, String remoteStoreRepositoryName, String indexUUID)
throws IOException {
final RepositoriesService repositoriesService = internalCluster().getCurrentClusterManagerNodeInstance(RepositoriesService.class);
final BlobStoreRepository remoteStoreRepository = (BlobStoreRepository) repositoriesService.repository(remoteStoreRepositoryName);
BlobPath shardLevelBlobPath = remoteStoreRepository.basePath().add(indexUUID).add("0").add("segments").add("lock_files");
Expand Down

0 comments on commit affbd76

Please sign in to comment.