Skip to content

Commit

Permalink
[7.5][ML] Always refresh dest index before starting analytics process (
Browse files Browse the repository at this point in the history
…elastic#48090)

If a job stops right after reindexing is finished but before
we refresh the destination index, we don't refresh at all.
If the job is started again right after, it jumps into the analyzing state.
However, the data is still not searchable.
This is why we were seeing test failures that we start the process
expecting X rows (where X is lower than the expected number of docs)
and we end up getting X+.

We fix this by moving the refresh of the dest index right before
we start the process so it always ensures the data is searchable.

Closes elastic#47612

Backport of elastic#48090
  • Loading branch information
dimitris-athanasiou committed Oct 17, 2019
1 parent 86fef7d commit 3fbffa6
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsFifty() throws Exception
"Finished analysis");
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/47612")
public void testStopAndRestart() throws Exception {
initialize("regression_stop_and_restart");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,6 @@ public void testLazyAssignmentWithModelMemoryLimitTooHighForAssignment() throws
"Stopped analytics");
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/47612")
public void testOutlierDetectionStopAndRestart() throws Exception {
String sourceIndex = "test-outlier-detection-stop-and-restart";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -160,34 +157,18 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
}

// Reindexing is complete; start analytics
ActionListener<RefreshResponse> refreshListener = ActionListener.wrap(
ActionListener<BulkByScrollResponse> reindexCompletedListener = ActionListener.wrap(
refreshResponse -> {
if (task.isStopping()) {
LOGGER.debug("[{}] Stopping before starting analytics process", config.getId());
return;
}
task.setReindexingTaskId(null);
startAnalytics(task, config, false);
},
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
);

// Refresh to ensure copied index is fully searchable
ActionListener<BulkByScrollResponse> reindexCompletedListener = ActionListener.wrap(
bulkResponse -> {
if (task.isStopping()) {
LOGGER.debug("[{}] Stopping before refreshing destination index", config.getId());
return;
}
task.setReindexingFinished();
auditor.info(
config.getId(),
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex()));
ClientHelper.executeAsyncWithOrigin(client,
ClientHelper.ML_ORIGIN,
RefreshAction.INSTANCE,
new RefreshRequest(config.getDest().getIndex()),
refreshListener);
startAnalytics(task, config, false);
},
error -> task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public StartDataFrameAnalyticsAction.TaskParams getParams() {
}

public void setReindexingTaskId(Long reindexingTaskId) {
LOGGER.debug("[{}] Setting reindexing task id to [{}] from [{}]", taskParams.getId(), reindexingTaskId, this.reindexingTaskId);
this.reindexingTaskId = reindexingTaskId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ public List<String> getFieldNames() {
public DataSummary collectDataSummary() {
SearchRequestBuilder searchRequestBuilder = buildDataSummarySearchRequestBuilder();
SearchResponse searchResponse = executeSearchRequest(searchRequestBuilder);
return new DataSummary(searchResponse.getHits().getTotalHits().value, context.extractedFields.getAllFields().size());
long rows = searchResponse.getHits().getTotalHits().value;
LOGGER.debug("[{}] Data summary rows [{}]", context.jobId, rows);
return new DataSummary(rows, context.extractedFields.getAllFields().size());
}

public void collectDataSummaryAsync(ActionListener<DataSummary> dataSummaryActionListener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config,
return;
}

// First we refresh the dest index to ensure data is searchable
refreshDest(config);

ProcessContext processContext = new ProcessContext(config.getId());
if (processContextByAllocation.putIfAbsent(task.getAllocationId(), processContext) != null) {
finishHandler.accept(ExceptionsHelper.serverError("[" + processContext.id
Expand Down Expand Up @@ -143,10 +146,12 @@ private void processData(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig c
refreshDest(config);
LOGGER.info("[{}] Result processor has completed", config.getId());
} catch (Exception e) {
String errorMsg = new ParameterizedMessage("[{}] Error while processing data [{}]", config.getId(), e.getMessage())
.getFormattedMessage();
LOGGER.error(errorMsg, e);
processContextByAllocation.get(task.getAllocationId()).setFailureReason(errorMsg);
if (task.isStopping() == false) {
String errorMsg = new ParameterizedMessage("[{}] Error while processing data [{}]", config.getId(), e.getMessage())
.getFormattedMessage();
LOGGER.error(errorMsg, e);
processContextByAllocation.get(task.getAllocationId()).setFailureReason(errorMsg);
}
} finally {
closeProcess(task);

Expand Down

0 comments on commit 3fbffa6

Please sign in to comment.