Skip to content

Commit

Permalink
[Snapshot Interop] Add Logic in Lock Manager to cleanup stale data po…
Browse files Browse the repository at this point in the history
…st index deletion.

Signed-off-by: Harish Bhakuni <[email protected]>
  • Loading branch information
Harish Bhakuni committed Jul 21, 2023
1 parent 1d3b006 commit 3798261
Show file tree
Hide file tree
Showing 7 changed files with 301 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,42 @@ private boolean deleteIfEmpty() throws IOException {
return true;
}

/*
As method name suggests, this method should be called only once index is deleted.
*/
public static void cleanupPostIndexDeletion(
ThreadPool threadPool,
RemoteDirectory remoteDataDirectory,
RemoteDirectory remoteMetadataDirectory,
RemoteStoreLockManager mdLockManager
) throws IOException {
Collection<String> metadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.METADATA_PREFIX);
if (metadataFiles.stream().anyMatch(metadataFile -> {
try {
return mdLockManager.isAcquired(FileLockInfo.getLockInfoBuilder().withFileToLock(metadataFile).build());
} catch (IOException e) {
logger.error("Assuming lock exists, as Checking lock for metadata " + metadataFile + " is failing with error: " + e);
return true;
}
})) {
logger.error("Some metadata file still have lock, skipping remote store cleanup");
return;
}
try {
threadPool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
try {
remoteDataDirectory.delete();
remoteMetadataDirectory.delete();
mdLockManager.delete();
} catch (Exception e) {
logger.info("Exception occured during directory cleanup post index deletion", e);
}
});
} catch (Exception e) {
logger.error("Exception occurred while deleting directory", e);
}
}

public void close() throws IOException {
deleteStaleSegmentsAsync(0);
deleteIfEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager;
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
Expand Down Expand Up @@ -59,7 +59,7 @@ public Directory newDirectory(String repositoryName, String indexUUID, String sh

RemoteDirectory dataDirectory = createRemoteDirectory(repository, commonBlobPath, "data");
RemoteDirectory metadataDirectory = createRemoteDirectory(repository, commonBlobPath, "metadata");
RemoteStoreMetadataLockManager mdLockManager = RemoteStoreLockManagerFactory.newLockManager(
RemoteStoreLockManager mdLockManager = RemoteStoreLockManagerFactory.newLockManager(
repositoriesService.get(),
repositoryName,
indexUUID,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store.lockmanager;

import org.opensearch.index.store.RemoteBufferedOutputDirectory;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;

public class LockManagerForClusterManagerNode extends RemoteStoreMetadataLockManager {
private final RemoteBufferedOutputDirectory dataDirectory;
private final RemoteBufferedOutputDirectory mdDirectory;
private final ThreadPool threadPool;

public LockManagerForClusterManagerNode(
RemoteBufferedOutputDirectory dataDirectory,
RemoteBufferedOutputDirectory mdDirectory,
RemoteBufferedOutputDirectory lockDirectory,
ThreadPool threadPool
) {
super(lockDirectory);
this.mdDirectory = mdDirectory;
this.dataDirectory = dataDirectory;
this.threadPool = threadPool;
}

/*
*/

/**
* releases the lock and cleans up remote store directory if there are no more locks.
* this should be called only once index is deleted.
* see https://github.com/opensearch-project/OpenSearch/issues/8469
* @param lockInfo lock info instance for which lock need to be removed.
* @throws IOException throws if cleanup or releasing of lock fails.
*/
@Override
public void releaseAndCleanup(LockInfo lockInfo) throws IOException {
release(lockInfo);
RemoteSegmentStoreDirectory.cleanupPostIndexDeletion(threadPool, dataDirectory, mdDirectory, this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ public interface RemoteStoreLockManager {
*/
void release(LockInfo lockInfo) throws IOException;

/**
*
* releases lock and cleans up remote store directory if there are no more md files.
* this should be only called from cluster manager node, after remote store index is deleted
* see https://github.com/opensearch-project/OpenSearch/issues/8469
* @param lockInfo lock info instance for which lock need to be removed.
* @throws IOException throws exception in case there is a problem in releasing lock.
*/
default void releaseAndCleanup(LockInfo lockInfo) throws IOException {
throw new UnsupportedOperationException();
}

/**
*
* @param lockInfo lock info instance for which we need to check if lock is acquired.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.function.Supplier;
Expand All @@ -27,17 +28,24 @@
public class RemoteStoreLockManagerFactory {
private static final String SEGMENTS = "segments";
private static final String LOCK_FILES = "lock_files";
private static final String METADATA = "metadata";
private static final String DATA = "data";
private final Supplier<RepositoriesService> repositoriesService;

public RemoteStoreLockManagerFactory(Supplier<RepositoriesService> repositoriesService) {
this.repositoriesService = repositoriesService;
}

public RemoteStoreMetadataLockManager newLockManager(String repositoryName, String indexUUID, String shardId) throws IOException {
public RemoteStoreLockManager newLockManager(String repositoryName, String indexUUID, String shardId) throws IOException {
return newLockManager(repositoriesService.get(), repositoryName, indexUUID, shardId);
}

public static RemoteStoreMetadataLockManager newLockManager(
public RemoteStoreLockManager newLockManager(String repositoryName, String indexUUID, String shardId, ThreadPool threadPool)
throws IOException {
return newLockManager(repositoriesService.get(), repositoryName, indexUUID, shardId, threadPool);
}

public static RemoteStoreLockManager newLockManager(
RepositoriesService repositoriesService,
String repositoryName,
String indexUUID,
Expand All @@ -46,13 +54,27 @@ public static RemoteStoreMetadataLockManager newLockManager(
try (Repository repository = repositoriesService.repository(repositoryName)) {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobPath shardLevelBlobPath = ((BlobStoreRepository) repository).basePath().add(indexUUID).add(shardId).add(SEGMENTS);
RemoteBufferedOutputDirectory shardMDLockDirectory = createRemoteBufferedOutputDirectory(
repository,
shardLevelBlobPath,
LOCK_FILES
);
RemoteBufferedOutputDirectory lockDirectory = createRemoteBufferedOutputDirectory(repository, shardLevelBlobPath, LOCK_FILES);
return new RemoteStoreMetadataLockManager(lockDirectory);
} catch (RepositoryMissingException e) {
throw new IllegalArgumentException("Repository should be present to acquire/release lock", e);
}
}

return new RemoteStoreMetadataLockManager(shardMDLockDirectory);
public static RemoteStoreLockManager newLockManager(
RepositoriesService repositoriesService,
String repositoryName,
String indexUUID,
String shardId,
ThreadPool threadPool
) {
try (Repository repository = repositoriesService.repository(repositoryName)) {
assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository";
BlobPath shardLevelBlobPath = ((BlobStoreRepository) repository).basePath().add(indexUUID).add(shardId).add(SEGMENTS);
RemoteBufferedOutputDirectory lockDirectory = createRemoteBufferedOutputDirectory(repository, shardLevelBlobPath, LOCK_FILES);
RemoteBufferedOutputDirectory metadataDirectory = createRemoteBufferedOutputDirectory(repository, shardLevelBlobPath, METADATA);
RemoteBufferedOutputDirectory dataDirectory = createRemoteBufferedOutputDirectory(repository, shardLevelBlobPath, DATA);
return new LockManagerForClusterManagerNode(dataDirectory, metadataDirectory, lockDirectory, threadPool);
} catch (RepositoryMissingException e) {
throw new IllegalArgumentException("Repository should be present to acquire/release lock", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.store.lockmanager.FileLockInfo;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.index.store.lockmanager.RemoteStoreMetadataLockManager;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.repositories.IndexId;
Expand Down Expand Up @@ -616,10 +616,11 @@ public void cloneRemoteStoreIndexShardSnapshot(
RemoteStoreShardShallowCopySnapshot remStoreBasedShardMetadata = (RemoteStoreShardShallowCopySnapshot) indexShardSnapshot;
String indexUUID = remStoreBasedShardMetadata.getIndexUUID();
String remoteStoreRepository = remStoreBasedShardMetadata.getRemoteStoreRepository();
RemoteStoreMetadataLockManager remoteStoreMetadataLockManger = remoteStoreLockManagerFactory.newLockManager(
RemoteStoreLockManager remoteStoreMetadataLockManger = remoteStoreLockManagerFactory.newLockManager(
remoteStoreRepository,
indexUUID,
String.valueOf(shardId.shardId())
String.valueOf(shardId.shardId()),
threadPool
);
remoteStoreMetadataLockManger.cloneLock(
FileLockInfo.getLockInfoBuilder().withAcquirerId(source.getUUID()).build(),
Expand Down Expand Up @@ -1072,11 +1073,21 @@ private void executeStaleShardDelete(
// Releasing lock file before deleting the shallow-snap-UUID file because in case of any failure while
// releasing the lock file, we would still have the shallow-snap-UUID file and that would be used during
// next delete operation for releasing this lock file
RemoteStoreMetadataLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory
.newLockManager(remoteStoreRepoForIndex, indexUUID, shardId);
remoteStoreMetadataLockManager.release(
FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID).build()
RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory.newLockManager(
remoteStoreRepoForIndex,
indexUUID,
shardId,
threadPool
);
if (!isIndexWithIndexUUIDPresent(clusterService, indexUUID)) {
remoteStoreMetadataLockManager.releaseAndCleanup(
FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID).build()
);
} else {
remoteStoreMetadataLockManager.release(
FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID).build()
);
}
}
}
}
Expand Down Expand Up @@ -1487,6 +1498,15 @@ private void cleanupStaleIndices(
}
}

private static boolean isIndexWithIndexUUIDPresent(ClusterService clusterService, String indexUUID) {
for (final IndexMetadata indexMetadata : clusterService.state().metadata().getIndices().values()) {
if (indexUUID.equals(indexMetadata.getIndexUUID())) {
return true;
}
}
return false;
}

private void executeOneStaleIndexDelete(
BlockingQueue<Map.Entry<String, BlobContainer>> staleIndicesToDelete,
RemoteStoreLockManagerFactory remoteStoreLockManagerFactory,
Expand Down Expand Up @@ -1519,11 +1539,18 @@ private void executeOneStaleIndexDelete(
// Releasing lock files before deleting the shallow-snap-UUID file because in case of any failure
// while releasing the lock file, we would still have the corresponding shallow-snap-UUID file
// and that would be used during next delete operation for releasing this stale lock file
RemoteStoreMetadataLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory
.newLockManager(remoteStoreRepoForIndex, indexUUID, shardBlob.getKey());
remoteStoreMetadataLockManager.release(
FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID).build()
);
RemoteStoreLockManager remoteStoreMetadataLockManager = remoteStoreLockManagerFactory
.newLockManager(remoteStoreRepoForIndex, indexUUID, shardBlob.getKey(), threadPool);

if (!isIndexWithIndexUUIDPresent(clusterService, indexUUID)) {
remoteStoreMetadataLockManager.releaseAndCleanup(
FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID).build()
);
} else {
remoteStoreMetadataLockManager.release(
FileLockInfo.getLockInfoBuilder().withAcquirerId(snapshotUUID).build()
);
}
}
}
}
Expand Down
Loading

0 comments on commit 3798261

Please sign in to comment.