Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Only report complete writing_results progress after completion #49551

Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ public class AnalyticsResultProcessor {

private static final Logger LOGGER = LogManager.getLogger(AnalyticsResultProcessor.class);

/**
* While we report progress as we read row results there are other things we need to account for
* to report completion. There are other types of results we can't predict the number of like
* progress objects and the inference model. Thus, we report a max progress until we know we have
* completed processing results.
*
* It is critical to ensure we do not report complete progress too soon as restarting a job
* uses the progress to determine which state to restart from. If we report full progress too soon
* we cannot restart a job as we will think the job was finished.
*/
private static final int MAX_PROGRESS_BEFORE_COMPLETION = 98;

private final DataFrameAnalyticsConfig analytics;
private final DataFrameRowsJoiner dataFrameRowsJoiner;
private final ProgressTracker progressTracker;
Expand Down Expand Up @@ -91,14 +103,9 @@ public void process(AnalyticsProcess<AnalyticsResult> process) {
processResult(result, resultsJoiner);
if (result.getRowResults() != null) {
processedRows++;
progressTracker.writingResultsPercent.set(processedRows >= totalRows ? 100 : (int) (processedRows * 100.0 / totalRows));
updateResultsProgress(processedRows >= totalRows ? 100 : (int) (processedRows * 100.0 / totalRows));
}
}
if (isCancelled == false) {
// This means we completed successfully so we need to set the progress to 100.
// This is because due to skipped rows, it is possible the processed rows will not reach the total rows.
progressTracker.writingResultsPercent.set(100);
}
} catch (Exception e) {
if (isCancelled) {
// No need to log error as it's due to stopping
Expand All @@ -107,11 +114,22 @@ public void process(AnalyticsProcess<AnalyticsResult> process) {
failure = "error parsing data frame analytics output: [" + e.getMessage() + "]";
}
} finally {
if (isCancelled == false) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we take into account failure != null? It seems weird to me that we are "complete" even if we failed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good point. I also took the opportunity to improve failure reporting in that class in general.

completeResultsProgress();
}
completionLatch.countDown();
process.consumeAndCloseOutputStream();
}
}

private void updateResultsProgress(int progress) {
progressTracker.writingResultsPercent.set(Math.min(progress, MAX_PROGRESS_BEFORE_COMPLETION));
}

private void completeResultsProgress() {
progressTracker.writingResultsPercent.set(100);
}

private void processResult(AnalyticsResult result, DataFrameRowsJoiner resultsJoiner) {
RowResults rowResults = result.getRowResults();
if (rowResults != null) {
Expand Down