Skip to content

Commit

Permalink
Create a clone of local segements size map used for Remote Segment St…
Browse files Browse the repository at this point in the history
…ats until sync to remote completes

Signed-off-by: bansvaru <[email protected]>
  • Loading branch information
linuxpi committed Jan 16, 2024
1 parent c132db9 commit 8d6c34b
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ public Map<String, Long> getLatestLocalFileNameLengthMap() {
* @param segmentFiles list of local refreshed segment files
* @param fileSizeFunction function is used to determine the file size in bytes
*/
public void updateLatestLocalFileNameLengthMap(
public Map<String, Long> updateLatestLocalFileNameLengthMap(
Collection<String> segmentFiles,
CheckedFunction<String, Long, IOException> fileSizeFunction
) {
Expand All @@ -332,6 +332,7 @@ public void updateLatestLocalFileNameLengthMap(
// Remove keys from the fileSizeMap that do not exist in the latest segment files
latestLocalFileNameLengthMap.entrySet().removeIf(entry -> fileSet.contains(entry.getKey()) == false);
computeBytesLag();
return Collections.unmodifiableMap(latestLocalFileNameLengthMap);
}

public void addToLatestUploadedFiles(String file) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ private boolean syncSegments() {
Collection<String> localSegmentsPostRefresh = segmentInfos.files(true);

// Create a map of file name to size and update the refresh segment tracker
updateLocalSizeMapAndTracker(localSegmentsPostRefresh);
Map<String, Long> localSegmentsSizeMap = updateLocalSizeMapAndTracker(localSegmentsPostRefresh);
CountDownLatch latch = new CountDownLatch(1);
ActionListener<Void> segmentUploadsCompletedListener = new LatchedActionListener<>(new ActionListener<>() {
@Override
Expand All @@ -231,6 +231,7 @@ public void onResponse(Void unused) {
refreshClockTimeMs,
refreshSeqNo,
lastRefreshedCheckpoint,
localSegmentsSizeMap,
checkpoint
);
// At this point since we have uploaded new segments, segment infos and segment metadata file,
Expand All @@ -251,7 +252,7 @@ public void onFailure(Exception e) {
}, latch);

// Start the segments files upload
uploadNewSegments(localSegmentsPostRefresh, segmentUploadsCompletedListener);
uploadNewSegments(localSegmentsPostRefresh, localSegmentsSizeMap, segmentUploadsCompletedListener);
latch.await();
} catch (EngineException e) {
logger.warn("Exception while reading SegmentInfosSnapshot", e);
Expand Down Expand Up @@ -295,10 +296,11 @@ private void onSuccessfulSegmentsSync(
long refreshClockTimeMs,
long refreshSeqNo,
long lastRefreshedCheckpoint,
Map<String, Long> localFileSizeMap,
ReplicationCheckpoint checkpoint
) {
// Update latest uploaded segment files name in segment tracker
segmentTracker.setLatestUploadedFiles(segmentTracker.getLatestLocalFileNameLengthMap().keySet());
segmentTracker.setLatestUploadedFiles(localFileSizeMap.keySet());
// Update the remote refresh time and refresh seq no
updateRemoteRefreshTimeAndSeqNo(refreshTimeMs, refreshClockTimeMs, refreshSeqNo);
// Reset the backoffDelayIterator for the future failures
Expand Down Expand Up @@ -371,7 +373,11 @@ void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos se
}
}

private void uploadNewSegments(Collection<String> localSegmentsPostRefresh, ActionListener<Void> listener) {
private void uploadNewSegments(
Collection<String> localSegmentsPostRefresh,
Map<String, Long> localSegmentsSizeMap,
ActionListener<Void> listener
) {
Collection<String> filteredFiles = localSegmentsPostRefresh.stream().filter(file -> !skipUpload(file)).collect(Collectors.toList());
if (filteredFiles.size() == 0) {
logger.debug("No new segments to upload in uploadNewSegments");
Expand All @@ -385,7 +391,7 @@ private void uploadNewSegments(Collection<String> localSegmentsPostRefresh, Acti

for (String src : filteredFiles) {
// Initializing listener here to ensure that the stats increment operations are thread-safe
UploadListener statsListener = createUploadListener();
UploadListener statsListener = createUploadListener(localSegmentsSizeMap);
ActionListener<Void> aggregatedListener = ActionListener.wrap(resp -> {
statsListener.onSuccess(src);
batchUploadListener.onResponse(resp);
Expand Down Expand Up @@ -445,8 +451,8 @@ private void updateRemoteRefreshTimeAndSeqNo(long refreshTimeMs, long refreshClo
*
* @param segmentFiles list of segment files that are part of the most recent local refresh.
*/
private void updateLocalSizeMapAndTracker(Collection<String> segmentFiles) {
segmentTracker.updateLatestLocalFileNameLengthMap(segmentFiles, storeDirectory::fileLength);
private Map<String, Long> updateLocalSizeMapAndTracker(Collection<String> segmentFiles) {
return segmentTracker.updateLatestLocalFileNameLengthMap(segmentFiles, storeDirectory::fileLength);
}

private void updateFinalStatusInSegmentTracker(boolean uploadStatus, long bytesBeforeUpload, long startTimeInNS) {
Expand Down Expand Up @@ -522,29 +528,29 @@ private boolean isLocalOrSnapshotRecovery() {
/**
* Creates an {@link UploadListener} containing the stats population logic which would be triggered before and after segment upload events
*/
private UploadListener createUploadListener() {
private UploadListener createUploadListener(Map<String, Long> fileSizeMap) {
return new UploadListener() {
private long uploadStartTime = 0;

@Override
public void beforeUpload(String file) {
// Start tracking the upload bytes started
segmentTracker.addUploadBytesStarted(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
segmentTracker.addUploadBytesStarted(fileSizeMap.get(file));
uploadStartTime = System.currentTimeMillis();
}

@Override
public void onSuccess(String file) {
// Track upload success
segmentTracker.addUploadBytesSucceeded(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
segmentTracker.addUploadBytesSucceeded(fileSizeMap.get(file));
segmentTracker.addToLatestUploadedFiles(file);
segmentTracker.addUploadTimeInMillis(Math.max(1, System.currentTimeMillis() - uploadStartTime));
}

@Override
public void onFailure(String file) {
// Track upload failure
segmentTracker.addUploadBytesFailed(segmentTracker.getLatestLocalFileNameLengthMap().get(file));
segmentTracker.addUploadBytesFailed(fileSizeMap.get(file));
segmentTracker.addUploadTimeInMillis(Math.max(1, System.currentTimeMillis() - uploadStartTime));
}
};
Expand Down

0 comments on commit 8d6c34b

Please sign in to comment.