diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index 4a85ff46d9025..ec58c29175e16 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -159,7 +159,7 @@ public void teardown() { assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_2_NAME)); } - public int getFileCount(Path path) throws Exception { + public static int getFileCount(Path path) throws Exception { final AtomicInteger filesExisting = new AtomicInteger(0); Files.walkFileTree(path, new SimpleFileVisitor<>() { @Override diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java index b12fbdd2a9bd7..d38620723a8f4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java @@ -10,22 +10,28 @@ import org.opensearch.action.ActionFuture; import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.UUIDs; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; import org.opensearch.test.FeatureFlagSetter; import org.opensearch.test.OpenSearchIntegTestCase; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Locale; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import static org.hamcrest.Matchers.is; import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings; +import static org.hamcrest.Matchers.comparesEqualTo; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) @@ -240,12 +246,13 @@ public void testDeleteMultipleShallowCopySnapshotsCase3() throws Exception { final String snapshotRepoName = "snapshot-repo-name"; final Path snapshotRepoPath = randomRepoPath(); createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowCopy(snapshotRepoPath)); - final String testIndex = "index-test"; - createIndexWithContent(testIndex); final Path remoteStoreRepoPath = randomRepoPath(); createRepository(REMOTE_REPO_NAME, "fs", remoteStoreRepoPath); + final String testIndex = "index-test"; + createIndexWithContent(testIndex); + final String remoteStoreEnabledIndexName = "remote-index-1"; final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(); createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings); @@ -289,6 +296,71 @@ public void testDeleteMultipleShallowCopySnapshotsCase3() throws Exception { assert (getLockFilesInRemoteStore(remoteStoreEnabledIndexName, REMOTE_REPO_NAME).length == 0); } + public void testRemoteStoreCleanupForDeletedIndex() throws Exception { + disableRepoConsistencyCheck("Remote store repository is being used in the test"); + FeatureFlagSetter.set(FeatureFlags.REMOTE_STORE); + + internalCluster().startClusterManagerOnlyNode(remoteStoreClusterSettings(REMOTE_REPO_NAME)); + internalCluster().startDataOnlyNode(); + final Client clusterManagerClient = internalCluster().clusterManagerClient(); + ensureStableCluster(2); + + final String snapshotRepoName = "snapshot-repo-name"; + final Path snapshotRepoPath = randomRepoPath(); + createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowCopy(snapshotRepoPath)); + + final Path remoteStoreRepoPath = randomRepoPath(); + createRepository(REMOTE_REPO_NAME, "fs", remoteStoreRepoPath); + + final String testIndex = "index-test"; + createIndexWithContent(testIndex); + + final String remoteStoreEnabledIndexName = "remote-index-1"; + final Settings remoteStoreEnabledIndexSettings = getRemoteStoreBackedIndexSettings(); + createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings); + indexRandomDocs(remoteStoreEnabledIndexName, randomIntBetween(5, 10)); + + String indexUUID = client().admin() + .indices() + .prepareGetSettings(remoteStoreEnabledIndexName) + .get() + .getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_INDEX_UUID); + + logger.info("--> create two remote index shallow snapshots"); + List shallowCopySnapshots = createNSnapshots(snapshotRepoName, 2); + + String[] lockFiles = getLockFilesInRemoteStore(remoteStoreEnabledIndexName, REMOTE_REPO_NAME); + assert (lockFiles.length == 2) : "lock files are " + Arrays.toString(lockFiles); + + // delete remote store index + assertAcked(client().admin().indices().prepareDelete(remoteStoreEnabledIndexName)); + + logger.info("--> delete snapshot 1"); + AcknowledgedResponse deleteSnapshotResponse = clusterManagerClient.admin() + .cluster() + .prepareDeleteSnapshot(snapshotRepoName, shallowCopySnapshots.get(0)) + .get(); + assertAcked(deleteSnapshotResponse); + + lockFiles = getLockFilesInRemoteStore(remoteStoreEnabledIndexName, REMOTE_REPO_NAME, indexUUID); + assert (lockFiles.length == 1) : "lock files are " + Arrays.toString(lockFiles); + + logger.info("--> delete snapshot 2"); + deleteSnapshotResponse = clusterManagerClient.admin() + .cluster() + .prepareDeleteSnapshot(snapshotRepoName, shallowCopySnapshots.get(1)) + .get(); + assertAcked(deleteSnapshotResponse); + + Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID); + // Delete is async. Give time for it + assertBusy(() -> { + try { + assertThat(RemoteStoreBaseIntegTestCase.getFileCount(indexPath), comparesEqualTo(0)); + } catch (Exception e) {} + }, 30, TimeUnit.SECONDS); + } + private List createNSnapshots(String repoName, int count) { final List snapshotNames = new ArrayList<>(count); final String prefix = "snap-" + UUIDs.randomBase64UUID(random()).toLowerCase(Locale.ROOT) + "-"; diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 8dfdb3e2c8e06..9ac5ebb94a5ca 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -837,29 +837,36 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException } } + public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep) { + deleteStaleSegmentsAsync(lastNMetadataFilesToKeep, ActionListener.wrap(r -> {}, e -> {})); + } + /** * Delete stale segment and metadata files asynchronously. * This method calls {@link RemoteSegmentStoreDirectory#deleteStaleSegments(int)} in an async manner. * @param lastNMetadataFilesToKeep number of metadata files to keep */ - public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep) { + public void deleteStaleSegmentsAsync(int lastNMetadataFilesToKeep, ActionListener listener) { if (canDeleteStaleCommits.compareAndSet(true, false)) { try { threadPool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> { try { deleteStaleSegments(lastNMetadataFilesToKeep); + listener.onResponse(null); } catch (Exception e) { - logger.info( + logger.error( "Exception while deleting stale commits from remote segment store, will retry delete post next commit", e ); + listener.onFailure(e); } finally { canDeleteStaleCommits.set(true); } }); } catch (Exception e) { - logger.info("Exception occurred while scheduling deleteStaleCommits", e); + logger.error("Exception occurred while scheduling deleteStaleCommits", e); canDeleteStaleCommits.set(true); + listener.onFailure(e); } } } @@ -891,7 +898,6 @@ private boolean deleteIfEmpty() throws IOException { } public void close() throws IOException { - deleteStaleSegmentsAsync(0); - deleteIfEmpty(); + deleteStaleSegmentsAsync(0, ActionListener.wrap(r -> deleteIfEmpty(), e -> logger.error("Failed to cleanup remote directory"))); } } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java index 3bec84f287ce4..3de7a706c0688 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java @@ -13,8 +13,8 @@ import org.opensearch.common.blobstore.BlobPath; import org.opensearch.index.IndexSettings; import org.opensearch.index.shard.ShardPath; +import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; -import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; @@ -59,7 +59,7 @@ public Directory newDirectory(String repositoryName, String indexUUID, String sh RemoteDirectory dataDirectory = createRemoteDirectory(repository, commonBlobPath, "data"); RemoteDirectory metadataDirectory = createRemoteDirectory(repository, commonBlobPath, "metadata"); - RemoteStoreMetadataLockManager mdLockManager = RemoteStoreLockManagerFactory.newLockManager( + RemoteStoreLockManager mdLockManager = RemoteStoreLockManagerFactory.newLockManager( repositoriesService.get(), repositoryName, indexUUID, diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManagerFactory.java b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManagerFactory.java index e866551eae143..1a306f3261094 100644 --- a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManagerFactory.java +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManagerFactory.java @@ -33,7 +33,7 @@ public RemoteStoreLockManagerFactory(Supplier repositoriesS this.repositoriesService = repositoriesService; } - public RemoteStoreMetadataLockManager newLockManager(String repositoryName, String indexUUID, String shardId) throws IOException { + public RemoteStoreLockManager newLockManager(String repositoryName, String indexUUID, String shardId) throws IOException { return newLockManager(repositoriesService.get(), repositoryName, indexUUID, shardId); } @@ -58,6 +58,12 @@ public static RemoteStoreMetadataLockManager newLockManager( } } + // TODO: remove this once we add poller in place to trigger remote store cleanup + // see: https://github.com/opensearch-project/OpenSearch/issues/8469 + public Supplier getRepositoriesService() { + return repositoriesService; + } + private static RemoteBufferedOutputDirectory createRemoteBufferedOutputDirectory( Repository repository, BlobPath commonBlobPath, diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 70db2e0c0a9bd..f24c255f0d0da 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -113,11 +113,12 @@ import org.opensearch.index.snapshots.blobstore.RateLimitingInputStream; import org.opensearch.index.snapshots.blobstore.SlicedInputStream; import org.opensearch.index.snapshots.blobstore.SnapshotFiles; +import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.store.lockmanager.FileLockInfo; +import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory; -import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.repositories.IndexId; @@ -616,7 +617,7 @@ public void cloneRemoteStoreIndexShardSnapshot( RemoteStoreShardShallowCopySnapshot remStoreBasedShardMetadata = (RemoteStoreShardShallowCopySnapshot) indexShardSnapshot; String indexUUID = remStoreBasedShardMetadata.getIndexUUID(); String remoteStoreRepository = remStoreBasedShardMetadata.getRemoteStoreRepository(); - RemoteStoreMetadataLockManager remoteStoreMetadataLockManger = remoteStoreLockManagerFactory.newLockManager( + RemoteStoreLockManager remoteStoreMetadataLockManger = remoteStoreLockManagerFactory.newLockManager( remoteStoreRepository, indexUUID, String.valueOf(shardId.shardId()) @@ -1072,11 +1073,24 @@ private void executeStaleShardDelete( // Releasing lock file before deleting the shallow-snap-UUID file because in case of any failure while // releasing the lock file, we would still have the shallow-snap-UUID file and that would be used during // next delete operation for releasing this lock file - RemoteStoreMetadataLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory - .newLockManager(remoteStoreRepoForIndex, indexUUID, shardId); + RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager( + remoteStoreRepoForIndex, + indexUUID, + shardId + ); remoteStoreMetadataLockManager.release( FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID).build() ); + if (!isIndexPresent(clusterService, indexUUID)) { + // this is a temporary solution where snapshot deletion triggers remote store side + // cleanup if index is already deleted. We will add a poller in future to take + // care of remote store side cleanup. + // see https://github.com/opensearch-project/OpenSearch/issues/8469 + new RemoteSegmentStoreDirectoryFactory( + remoteStoreLockManagerFactory.getRepositoriesService(), + threadPool + ).newDirectory(remoteStoreRepoForIndex, indexUUID, shardId).close(); + } } } } @@ -1487,6 +1501,15 @@ private void cleanupStaleIndices( } } + private static boolean isIndexPresent(ClusterService clusterService, String indexUUID) { + for (final IndexMetadata indexMetadata : clusterService.state().metadata().getIndices().values()) { + if (indexUUID.equals(indexMetadata.getIndexUUID())) { + return true; + } + } + return false; + } + private void executeOneStaleIndexDelete( BlockingQueue> staleIndicesToDelete, RemoteStoreLockManagerFactory remoteStoreLockManagerFactory, @@ -1519,11 +1542,21 @@ private void executeOneStaleIndexDelete( // Releasing lock files before deleting the shallow-snap-UUID file because in case of any failure // while releasing the lock file, we would still have the corresponding shallow-snap-UUID file // and that would be used during next delete operation for releasing this stale lock file - RemoteStoreMetadataLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory + RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory .newLockManager(remoteStoreRepoForIndex, indexUUID, shardBlob.getKey()); remoteStoreMetadataLockManager.release( FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID).build() ); + if (!isIndexPresent(clusterService, indexUUID)) { + // this is a temporary solution where snapshot deletion triggers remote store side + // cleanup if index is already deleted. We will add a poller in future to take + // care of remote store side cleanup. + // see https://github.com/opensearch-project/OpenSearch/issues/8469 + new RemoteSegmentStoreDirectoryFactory( + remoteStoreLockManagerFactory.getRepositoriesService(), + threadPool + ).newDirectory(remoteStoreRepoForIndex, indexUUID, shardBlob.getKey()).close(); + } } } } diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java index 26082f2456867..ee9181dba4563 100644 --- a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -166,8 +166,6 @@ public void testRetrieveSnapshots() throws Exception { assertThat(snapshotIds, equalTo(originalSnapshots)); } - // Validate Scenario remoteStoreShallowCopy Snapshot -> remoteStoreShallowCopy Snapshot - // -> remoteStoreShallowCopy Snapshot -> normal snapshot public void testReadAndWriteSnapshotsThroughIndexFile() throws Exception { final BlobStoreRepository repository = setupRepo(); final long pendingGeneration = repository.metadata.pendingGeneration(); diff --git a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java index 2c6f4f7b15e5d..9c3f342e58111 100644 --- a/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -557,6 +557,11 @@ protected String[] getLockFilesInRemoteStore(String remoteStoreIndex, String rem .prepareGetSettings(remoteStoreIndex) .get() .getSetting(remoteStoreIndex, IndexMetadata.SETTING_INDEX_UUID); + return getLockFilesInRemoteStore(remoteStoreIndex, remoteStoreRepositoryName, indexUUID); + } + + protected String[] getLockFilesInRemoteStore(String remoteStoreIndex, String remoteStoreRepositoryName, String indexUUID) + throws IOException { final RepositoriesService repositoriesService = internalCluster().getCurrentClusterManagerNodeInstance(RepositoriesService.class); final BlobStoreRepository remoteStoreRepository = (BlobStoreRepository) repositoriesService.repository(remoteStoreRepositoryName); BlobPath shardLevelBlobPath = remoteStoreRepository.basePath().add(indexUUID).add("0").add("segments").add("lock_files");