Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to skip pinned timestamp in remote segment garbage collector #15017

Merged
merged 6 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,15 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
} else {
// As delete is async its possible that the file gets created before the deletion or after
// deletion.
MatcherAssert.assertThat(
actualFileCount,
is(oneOf(lastNMetadataFilesToKeep - 1, lastNMetadataFilesToKeep, lastNMetadataFilesToKeep + 1))
);
if (RemoteStoreSettings.isPinnedTimestampsEnabled()) {
// With pinned timestamp, we also keep md files since last successful fetch
assertTrue(actualFileCount >= lastNMetadataFilesToKeep);
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
} else {
MatcherAssert.assertThat(
actualFileCount,
is(oneOf(lastNMetadataFilesToKeep - 1, lastNMetadataFilesToKeep, lastNMetadataFilesToKeep + 1))
);
}
}
}, 30, TimeUnit.SECONDS);
}
Expand Down Expand Up @@ -249,7 +254,12 @@ public void testStaleCommitDeletionWithMinSegmentFiles_3() throws Exception {
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
MatcherAssert.assertThat(actualFileCount, is(oneOf(4)));
if (RemoteStoreSettings.isPinnedTimestampsEnabled()) {
// With pinned timestamp, we also keep md files since last successful fetch
assertTrue(actualFileCount >= 4);
} else {
assertEquals(4, actualFileCount);
}
}

public void testStaleCommitDeletionWithMinSegmentFiles_Disabled() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.store.RemoteBufferedOutputDirectory;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
Expand Down Expand Up @@ -287,8 +288,14 @@ public void testDeleteMultipleShallowCopySnapshotsCase3() throws Exception {
public void testRemoteStoreCleanupForDeletedIndex() throws Exception {
disableRepoConsistencyCheck("Remote store repository is being used in the test");
final Path remoteStoreRepoPath = randomRepoPath();
internalCluster().startClusterManagerOnlyNode(remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath));
internalCluster().startDataOnlyNode(remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath));
Settings settings = remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath);
// Disabling pinned timestamp as this test is specifically for shallow snapshot.
settings = Settings.builder()
.put(settings)
.put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), false)
.build();
internalCluster().startClusterManagerOnlyNode(settings);
internalCluster().startDataOnlyNode(settings);
final Client clusterManagerClient = internalCluster().clusterManagerClient();
ensureStableCluster(2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.node.remotestore.RemoteStorePinnedTimestampService;
import org.opensearch.threadpool.ThreadPool;

import java.io.FileNotFoundException;
Expand Down Expand Up @@ -91,6 +92,8 @@

private final RemoteStoreLockManager mdLockManager;

private final Map<Long, String> metadataFilePinnedTimestampMap;
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved

private final ThreadPool threadPool;

/**
Expand Down Expand Up @@ -132,6 +135,7 @@
this.remoteMetadataDirectory = remoteMetadataDirectory;
this.mdLockManager = mdLockManager;
this.threadPool = threadPool;
this.metadataFilePinnedTimestampMap = new HashMap<>();
this.logger = Loggers.getLogger(getClass(), shardId);
init();
}
Expand Down Expand Up @@ -176,6 +180,42 @@
return remoteSegmentMetadata;
}

/**
* Initializes the remote segment metadata to a specific timestamp.
*
* @param timestamp The timestamp to initialize the remote segment metadata to.
* @return The RemoteSegmentMetadata object corresponding to the specified timestamp, or null if no metadata file is found for that timestamp.
* @throws IOException If an I/O error occurs while reading the metadata file.
*/
public RemoteSegmentMetadata initializeToSpecificTimestamp(long timestamp) throws IOException {
List<String> metadataFiles = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
MetadataFilenameUtils.METADATA_PREFIX,
Integer.MAX_VALUE
);
Set<String> lockedMetadataFiles = RemoteStoreUtils.getPinnedTimestampLockedFiles(
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
metadataFiles,
Set.of(timestamp),
MetadataFilenameUtils::getTimestamp,
MetadataFilenameUtils::getNodeIdByPrimaryTermAndGen
);
if (lockedMetadataFiles.isEmpty()) {
return null;
}
if (lockedMetadataFiles.size() > 1) {
throw new IOException(

Check warning on line 205 in server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java#L205

Added line #L205 was not covered by tests
"Expected exactly one metadata file matching timestamp: " + timestamp + " but got " + lockedMetadataFiles
);
}
String metadataFile = lockedMetadataFiles.iterator().next();
RemoteSegmentMetadata remoteSegmentMetadata = readMetadataFile(metadataFile);
if (remoteSegmentMetadata != null) {
this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>(remoteSegmentMetadata.getMetadata());
} else {
this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>();

Check warning on line 214 in server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java#L214

Added line #L214 was not covered by tests
}
return remoteSegmentMetadata;
}

/**
* Read the latest metadata file to get the list of segments uploaded to the remote segment store.
* We upload a metadata file per refresh, but it is not unique per refresh. Refresh metadata file is unique for a given commit.
Expand Down Expand Up @@ -324,7 +364,8 @@
long translogGeneration,
long uploadCounter,
int metadataVersion,
String nodeId
String nodeId,
long creationTimestamp
) {
return String.join(
SEPARATOR,
Expand All @@ -334,11 +375,30 @@
RemoteStoreUtils.invertLong(translogGeneration),
RemoteStoreUtils.invertLong(uploadCounter),
String.valueOf(Objects.hash(nodeId)),
RemoteStoreUtils.invertLong(System.currentTimeMillis()),
RemoteStoreUtils.invertLong(creationTimestamp),
String.valueOf(metadataVersion)
);
}

public static String getMetadataFilename(
long primaryTerm,
long generation,
long translogGeneration,
long uploadCounter,
int metadataVersion,
String nodeId
) {
return getMetadataFilename(
primaryTerm,
generation,
translogGeneration,
uploadCounter,
metadataVersion,
nodeId,
System.currentTimeMillis()
);
}

// Visible for testing
static long getPrimaryTerm(String[] filenameTokens) {
return RemoteStoreUtils.invertLong(filenameTokens[1]);
Expand Down Expand Up @@ -778,6 +838,7 @@
);
return;
}

List<String> sortedMetadataFileList = remoteMetadataDirectory.listFilesByPrefixInLexicographicOrder(
MetadataFilenameUtils.METADATA_PREFIX,
Integer.MAX_VALUE
Expand All @@ -791,16 +852,44 @@
return;
}

List<String> metadataFilesEligibleToDelete = new ArrayList<>(
sortedMetadataFileList.subList(lastNMetadataFilesToKeep, sortedMetadataFileList.size())
// Check last fetch status of pinned timestamps. If stale, return.
if (RemoteStoreUtils.isPinnedTimestampStateStale()) {
gbbafna marked this conversation as resolved.
Show resolved Hide resolved
logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale");
return;

Check warning on line 858 in server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java#L857-L858

Added lines #L857 - L858 were not covered by tests
}

Tuple<Long, Set<Long>> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps();

Set<String> implicitLockedFiles = RemoteStoreUtils.getPinnedTimestampLockedFiles(
sortedMetadataFileList,
pinnedTimestampsState.v2(),
metadataFilePinnedTimestampMap,
MetadataFilenameUtils::getTimestamp,
MetadataFilenameUtils::getNodeIdByPrimaryTermAndGen
);
Set<String> allLockFiles;
final Set<String> allLockFiles = new HashSet<>(implicitLockedFiles);

try {
allLockFiles = ((RemoteStoreMetadataLockManager) mdLockManager).fetchLockedMetadataFiles(MetadataFilenameUtils.METADATA_PREFIX);
allLockFiles.addAll(
((RemoteStoreMetadataLockManager) mdLockManager).fetchLockedMetadataFiles(MetadataFilenameUtils.METADATA_PREFIX)
);
} catch (Exception e) {
logger.error("Exception while fetching segment metadata lock files, skipping deleteStaleSegments", e);
return;
}

List<String> metadataFilesEligibleToDelete = new ArrayList<>(
sortedMetadataFileList.subList(lastNMetadataFilesToKeep, sortedMetadataFileList.size())
);

// Along with last N files, we need to keep files since last successful run of scheduler
long lastSuccessfulFetchOfPinnedTimestamps = pinnedTimestampsState.v1();
metadataFilesEligibleToDelete = RemoteStoreUtils.filterOutMetadataFilesBasedOnAge(
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
metadataFilesEligibleToDelete,
MetadataFilenameUtils::getTimestamp,
lastSuccessfulFetchOfPinnedTimestamps
);

List<String> metadataFilesToBeDeleted = metadataFilesEligibleToDelete.stream()
.filter(metadataFile -> allLockFiles.contains(metadataFile) == false)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,9 @@ private ActionListener<Void> getListenerForWriteCallResponse(

private PinnedTimestamps readExistingPinnedTimestamps(String blobFilename, RemotePinnedTimestamps remotePinnedTimestamps) {
remotePinnedTimestamps.setBlobFileName(blobFilename);
remotePinnedTimestamps.setFullBlobName(pinnedTimestampsBlobStore.getBlobPathForUpload(remotePinnedTimestamps));
remotePinnedTimestamps.setFullBlobName(pinnedTimestampsBlobStore().getBlobPathForUpload(remotePinnedTimestamps));
try {
return pinnedTimestampsBlobStore.read(remotePinnedTimestamps);
return pinnedTimestampsBlobStore().read(remotePinnedTimestamps);
} catch (IOException e) {
throw new RuntimeException("Failed to read existing pinned timestamps", e);
}
Expand All @@ -245,6 +245,14 @@ public static Tuple<Long, Set<Long>> getPinnedTimestamps() {
return pinnedTimestampsSet;
}

public RemoteStorePinnedTimestampsBlobStore pinnedTimestampsBlobStore() {
return pinnedTimestampsBlobStore;
}

public BlobStoreTransferService blobStoreTransferService() {
return blobStoreTransferService;
}

/**
* Inner class for asynchronously updating the pinned timestamp set.
*/
Expand All @@ -266,11 +274,12 @@ protected void runInternal() {
clusterService.state().metadata().clusterUUID(),
blobStoreRepository.getCompressor()
);
BlobPath path = pinnedTimestampsBlobStore.getBlobPathForUpload(remotePinnedTimestamps);
blobStoreTransferService.listAllInSortedOrder(path, remotePinnedTimestamps.getType(), 1, new ActionListener<>() {
BlobPath path = pinnedTimestampsBlobStore().getBlobPathForUpload(remotePinnedTimestamps);
blobStoreTransferService().listAllInSortedOrder(path, remotePinnedTimestamps.getType(), 1, new ActionListener<>() {
@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
if (blobMetadata.isEmpty()) {
pinnedTimestampsSet = new Tuple<>(triggerTimestamp, Set.of());
return;
}
PinnedTimestamps pinnedTimestamps = readExistingPinnedTimestamps(blobMetadata.get(0).name(), remotePinnedTimestamps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1081,5 +1081,4 @@ public void testIsPinnedTimestampStateStaleFeatureEnabled() {
setupRemotePinnedTimestampFeature(true);
assertTrue(RemoteStoreUtils.isPinnedTimestampStateStale());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,47 +43,19 @@ public class BaseRemoteSegmentStoreDirectoryTests extends IndexShardTestCase {
protected SegmentInfos segmentInfos;
protected ThreadPool threadPool;

protected final String metadataFilename = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(
12,
23,
34,
1,
1,
"node-1"
);
protected String metadataFilename = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(12, 23, 34, 1, 1, "node-1");

protected final String metadataFilenameDup = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(
protected String metadataFilenameDup = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(
12,
23,
34,
2,
1,
"node-2"
);
protected final String metadataFilename2 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(
12,
13,
34,
1,
1,
"node-1"
);
protected final String metadataFilename3 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(
10,
38,
34,
1,
1,
"node-1"
);
protected final String metadataFilename4 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(
10,
36,
34,
1,
1,
"node-1"
);
protected String metadataFilename2 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(12, 13, 34, 1, 1, "node-1");
protected String metadataFilename3 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(10, 38, 34, 1, 1, "node-1");
protected String metadataFilename4 = RemoteSegmentStoreDirectory.MetadataFilenameUtils.getMetadataFilename(10, 36, 34, 1, 1, "node-1");

public void setupRemoteSegmentStoreDirectory() throws IOException {
remoteDataDirectory = mock(RemoteDirectory.class);
Expand Down
Loading
Loading