From 74140432ad876738421ee214e142100cc2fa5687 Mon Sep 17 00:00:00 2001 From: Varun Bansal Date: Fri, 2 Feb 2024 15:18:12 +0530 Subject: [PATCH] Create a clone of local segements size map used for Remote Segment Stats until sync to remote completes (#11896) Signed-off-by: bansvaru Signed-off-by: Shivansh Arora --- .../remote/RemoteSegmentTransferTracker.java | 10 ++++-- .../shard/RemoteStoreRefreshListener.java | 34 +++++++++++++------ 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java index fe9440813b94f..92436a09a4e7e 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java @@ -301,12 +301,17 @@ public Map getLatestLocalFileNameLengthMap() { } /** - * Updates the latestLocalFileNameLengthMap by adding file name and it's size to the map. The method is given a function as an argument which is used for determining the file size (length in bytes). This method is also provided the collection of segment files which are the latest refresh local segment files. This method also removes the stale segment files from the map that are not part of the input segment files. + * Updates the latestLocalFileNameLengthMap by adding file name and it's size to the map. + * The method is given a function as an argument which is used for determining the file size (length in bytes). + * This method is also provided the collection of segment files which are the latest refresh local segment files. + * This method also removes the stale segment files from the map that are not part of the input segment files. * * @param segmentFiles list of local refreshed segment files * @param fileSizeFunction function is used to determine the file size in bytes + * + * @return updated map of local segment files and filesize */ - public void updateLatestLocalFileNameLengthMap( + public Map updateLatestLocalFileNameLengthMap( Collection segmentFiles, CheckedFunction fileSizeFunction ) { @@ -332,6 +337,7 @@ public void updateLatestLocalFileNameLengthMap( // Remove keys from the fileSizeMap that do not exist in the latest segment files latestLocalFileNameLengthMap.entrySet().removeIf(entry -> fileSet.contains(entry.getKey()) == false); computeBytesLag(); + return Collections.unmodifiableMap(latestLocalFileNameLengthMap); } public void addToLatestUploadedFiles(String file) { diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index e96a4bb19a960..f75e86540ed9b 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -226,7 +226,9 @@ private boolean syncSegments() { Collection localSegmentsPostRefresh = segmentInfos.files(true); // Create a map of file name to size and update the refresh segment tracker - updateLocalSizeMapAndTracker(localSegmentsPostRefresh); + Map localSegmentsSizeMap = updateLocalSizeMapAndTracker(localSegmentsPostRefresh).entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); CountDownLatch latch = new CountDownLatch(1); ActionListener segmentUploadsCompletedListener = new LatchedActionListener<>(new ActionListener<>() { @Override @@ -242,6 +244,7 @@ public void onResponse(Void unused) { refreshClockTimeMs, refreshSeqNo, lastRefreshedCheckpoint, + localSegmentsSizeMap, checkpoint ); // At this point since we have uploaded new segments, segment infos and segment metadata file, @@ -262,7 +265,7 @@ public void onFailure(Exception e) { }, latch); // Start the segments files upload - uploadNewSegments(localSegmentsPostRefresh, segmentUploadsCompletedListener); + uploadNewSegments(localSegmentsPostRefresh, localSegmentsSizeMap, segmentUploadsCompletedListener); latch.await(); } catch (EngineException e) { logger.warn("Exception while reading SegmentInfosSnapshot", e); @@ -306,10 +309,11 @@ private void onSuccessfulSegmentsSync( long refreshClockTimeMs, long refreshSeqNo, long lastRefreshedCheckpoint, + Map localFileSizeMap, ReplicationCheckpoint checkpoint ) { // Update latest uploaded segment files name in segment tracker - segmentTracker.setLatestUploadedFiles(segmentTracker.getLatestLocalFileNameLengthMap().keySet()); + segmentTracker.setLatestUploadedFiles(localFileSizeMap.keySet()); // Update the remote refresh time and refresh seq no updateRemoteRefreshTimeAndSeqNo(refreshTimeMs, refreshClockTimeMs, refreshSeqNo); // Reset the backoffDelayIterator for the future failures @@ -382,7 +386,11 @@ void uploadMetadata(Collection localSegmentsPostRefresh, SegmentInfos se } } - private void uploadNewSegments(Collection localSegmentsPostRefresh, ActionListener listener) { + private void uploadNewSegments( + Collection localSegmentsPostRefresh, + Map localSegmentsSizeMap, + ActionListener listener + ) { Collection filteredFiles = localSegmentsPostRefresh.stream().filter(file -> !skipUpload(file)).collect(Collectors.toList()); if (filteredFiles.size() == 0) { logger.debug("No new segments to upload in uploadNewSegments"); @@ -396,7 +404,7 @@ private void uploadNewSegments(Collection localSegmentsPostRefresh, Acti for (String src : filteredFiles) { // Initializing listener here to ensure that the stats increment operations are thread-safe - UploadListener statsListener = createUploadListener(); + UploadListener statsListener = createUploadListener(localSegmentsSizeMap); ActionListener aggregatedListener = ActionListener.wrap(resp -> { statsListener.onSuccess(src); batchUploadListener.onResponse(resp); @@ -455,9 +463,11 @@ private void updateRemoteRefreshTimeAndSeqNo(long refreshTimeMs, long refreshClo * Updates map of file name to size of the input segment files in the segment tracker. Uses {@code storeDirectory.fileLength(file)} to get the size. * * @param segmentFiles list of segment files that are part of the most recent local refresh. + * + * @return updated map of local segment files and filesize */ - private void updateLocalSizeMapAndTracker(Collection segmentFiles) { - segmentTracker.updateLatestLocalFileNameLengthMap(segmentFiles, storeDirectory::fileLength); + private Map updateLocalSizeMapAndTracker(Collection segmentFiles) { + return segmentTracker.updateLatestLocalFileNameLengthMap(segmentFiles, storeDirectory::fileLength); } private void updateFinalStatusInSegmentTracker(boolean uploadStatus, long bytesBeforeUpload, long startTimeInNS) { @@ -532,22 +542,24 @@ private boolean isLocalOrSnapshotRecovery() { /** * Creates an {@link UploadListener} containing the stats population logic which would be triggered before and after segment upload events + * + * @param fileSizeMap updated map of current snapshot of local segments to their sizes */ - private UploadListener createUploadListener() { + private UploadListener createUploadListener(Map fileSizeMap) { return new UploadListener() { private long uploadStartTime = 0; @Override public void beforeUpload(String file) { // Start tracking the upload bytes started - segmentTracker.addUploadBytesStarted(segmentTracker.getLatestLocalFileNameLengthMap().get(file)); + segmentTracker.addUploadBytesStarted(fileSizeMap.get(file)); uploadStartTime = System.currentTimeMillis(); } @Override public void onSuccess(String file) { // Track upload success - segmentTracker.addUploadBytesSucceeded(segmentTracker.getLatestLocalFileNameLengthMap().get(file)); + segmentTracker.addUploadBytesSucceeded(fileSizeMap.get(file)); segmentTracker.addToLatestUploadedFiles(file); segmentTracker.addUploadTimeInMillis(Math.max(1, System.currentTimeMillis() - uploadStartTime)); } @@ -555,7 +567,7 @@ public void onSuccess(String file) { @Override public void onFailure(String file) { // Track upload failure - segmentTracker.addUploadBytesFailed(segmentTracker.getLatestLocalFileNameLengthMap().get(file)); + segmentTracker.addUploadBytesFailed(fileSizeMap.get(file)); segmentTracker.addUploadTimeInMillis(Math.max(1, System.currentTimeMillis() - uploadStartTime)); } };