Skip to content

Commit

Permalink
A few improvements to AnalyticsProcessManager class that make the cod…
Browse files Browse the repository at this point in the history
…e more readable. (elastic#50026)
  • Loading branch information
przemekwitek authored and SivagurunathanV committed Jan 21, 2020
1 parent b24d5b4 commit d8d90b6
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -233,15 +233,7 @@ private void startAnalytics(DataFrameAnalyticsTask task, DataFrameAnalyticsConfi
DataFrameAnalyticsTaskState analyzingState = new DataFrameAnalyticsTaskState(DataFrameAnalyticsState.ANALYZING,
task.getAllocationId(), null);
task.updatePersistentTaskState(analyzingState, ActionListener.wrap(
updatedTask -> processManager.runJob(task, config, dataExtractorFactory,
error -> {
if (error != null) {
task.updateState(DataFrameAnalyticsState.FAILED, error.getMessage());
} else {
auditor.info(config.getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_ANALYSIS);
task.markAsCompleted();
}
}),
updatedTask -> processManager.runJob(task, config, dataExtractorFactory),
error -> {
if (ExceptionsHelper.unwrapCause(error) instanceof ResourceNotFoundException) {
// Task has stopped
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.search.SearchResponse;
Expand Down Expand Up @@ -90,19 +91,19 @@ public AnalyticsProcessManager(Client client,
this.trainedModelProvider = Objects.requireNonNull(trainedModelProvider);
}

public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, DataFrameDataExtractorFactory dataExtractorFactory,
Consumer<Exception> finishHandler) {
public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, DataFrameDataExtractorFactory dataExtractorFactory) {
executorServiceForJob.execute(() -> {
ProcessContext processContext = new ProcessContext(config.getId());
ProcessContext processContext = new ProcessContext(config);
synchronized (processContextByAllocation) {
if (task.isStopping()) {
// The task was requested to stop before we created the process context
finishHandler.accept(null);
auditor.info(config.getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_ANALYSIS);
task.markAsCompleted();
return;
}
if (processContextByAllocation.putIfAbsent(task.getAllocationId(), processContext) != null) {
finishHandler.accept(
ExceptionsHelper.serverError("[" + config.getId() + "] Could not create process as one already exists"));
task.updateState(
DataFrameAnalyticsState.FAILED, "[" + config.getId() + "] Could not create process as one already exists");
return;
}
}
Expand All @@ -113,13 +114,13 @@ public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config,
// Fetch existing model state (if any)
BytesReference state = getModelState(config);

if (processContext.startProcess(dataExtractorFactory, config, task, state)) {
executorServiceForProcess.execute(() -> processResults(processContext));
executorServiceForProcess.execute(() -> processData(task, config, processContext.dataExtractor,
processContext.process, processContext.resultProcessor, finishHandler, state));
if (processContext.startProcess(dataExtractorFactory, task, state)) {
executorServiceForProcess.execute(() -> processContext.resultProcessor.get().process(processContext.process.get()));
executorServiceForProcess.execute(() -> processData(task, processContext, state));
} else {
processContextByAllocation.remove(task.getAllocationId());
finishHandler.accept(null);
auditor.info(config.getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_ANALYSIS);
task.markAsCompleted();
}
});
}
Expand All @@ -140,26 +141,18 @@ private BytesReference getModelState(DataFrameAnalyticsConfig config) {
}
}

private void processResults(ProcessContext processContext) {
private void processData(DataFrameAnalyticsTask task, ProcessContext processContext, BytesReference state) {
DataFrameAnalyticsConfig config = processContext.config;
DataFrameDataExtractor dataExtractor = processContext.dataExtractor.get();
AnalyticsProcess<AnalyticsResult> process = processContext.process.get();
AnalyticsResultProcessor resultProcessor = processContext.resultProcessor.get();
try {
processContext.resultProcessor.process(processContext.process);
} catch (Exception e) {
processContext.setFailureReason(e.getMessage());
}
}

private void processData(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, DataFrameDataExtractor dataExtractor,
AnalyticsProcess<AnalyticsResult> process, AnalyticsResultProcessor resultProcessor,
Consumer<Exception> finishHandler, BytesReference state) {

try {
ProcessContext processContext = processContextByAllocation.get(task.getAllocationId());
writeHeaderRecord(dataExtractor, process);
writeDataRows(dataExtractor, process, config.getAnalysis(), task.getProgressTracker());
process.writeEndOfDataMessage();
process.flushStream();

restoreState(config, state, process, finishHandler);
restoreState(task, config, state, process);

LOGGER.info("[{}] Waiting for result processor to complete", config.getId());
resultProcessor.awaitForCompletion();
Expand All @@ -168,26 +161,34 @@ private void processData(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig c
refreshDest(config);
LOGGER.info("[{}] Result processor has completed", config.getId());
} catch (Exception e) {
if (task.isStopping() == false) {
String errorMsg = new ParameterizedMessage("[{}] Error while processing data [{}]", config.getId(), e.getMessage())
.getFormattedMessage();
if (task.isStopping()) {
// Errors during task stopping are expected but we still want to log them just in case.
String errorMsg =
new ParameterizedMessage(
"[{}] Error while processing data [{}]; task is stopping", config.getId(), e.getMessage()).getFormattedMessage();
LOGGER.debug(errorMsg, e);
} else {
String errorMsg =
new ParameterizedMessage("[{}] Error while processing data [{}]", config.getId(), e.getMessage()).getFormattedMessage();
LOGGER.error(errorMsg, e);
processContextByAllocation.get(task.getAllocationId()).setFailureReason(errorMsg);
processContext.setFailureReason(errorMsg);
}
} finally {
closeProcess(task);

ProcessContext processContext = processContextByAllocation.remove(task.getAllocationId());
processContextByAllocation.remove(task.getAllocationId());
LOGGER.debug("Removed process context for task [{}]; [{}] processes still running", config.getId(),
processContextByAllocation.size());

if (processContext.getFailureReason() == null) {
// This results in marking the persistent task as complete
LOGGER.info("[{}] Marking task completed", config.getId());
finishHandler.accept(null);
auditor.info(config.getId(), Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_ANALYSIS);
task.markAsCompleted();
} else {
LOGGER.error("[{}] Marking task failed; {}", config.getId(), processContext.getFailureReason());
task.updateState(DataFrameAnalyticsState.FAILED, processContext.getFailureReason());
// Note: We are not marking the task as failed here as we want the user to be able to inspect the failure reason.
}
}
}
Expand Down Expand Up @@ -239,8 +240,8 @@ private void writeHeaderRecord(DataFrameDataExtractor dataExtractor, AnalyticsPr
process.writeRecord(headerRecord);
}

private void restoreState(DataFrameAnalyticsConfig config, @Nullable BytesReference state, AnalyticsProcess<AnalyticsResult> process,
Consumer<Exception> failureHandler) {
private void restoreState(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config, @Nullable BytesReference state,
AnalyticsProcess<AnalyticsResult> process) {
if (config.getAnalysis().persistsState() == false) {
LOGGER.debug("[{}] Analysis does not support state", config.getId());
return;
Expand All @@ -258,7 +259,7 @@ private void restoreState(DataFrameAnalyticsConfig config, @Nullable BytesRefere
process.restoreState(state);
} catch (Exception e) {
LOGGER.error(new ParameterizedMessage("[{}] Failed to restore state", process.getConfig().jobId()), e);
failureHandler.accept(ExceptionsHelper.serverError("Failed to restore state", e));
task.updateState(DataFrameAnalyticsState.FAILED, "Failed to restore state: " + e.getMessage());
}
}

Expand Down Expand Up @@ -293,9 +294,10 @@ private void closeProcess(DataFrameAnalyticsTask task) {

ProcessContext processContext = processContextByAllocation.get(task.getAllocationId());
try {
processContext.process.close();
processContext.process.get().close();
LOGGER.info("[{}] Closed process", configId);
} catch (Exception e) {
LOGGER.error("[" + configId + "] Error closing data frame analyzer process", e);
String errorMsg = new ParameterizedMessage(
"[{}] Error closing data frame analyzer process [{}]", configId, e.getMessage()).getFormattedMessage();
processContext.setFailureReason(errorMsg);
Expand Down Expand Up @@ -323,79 +325,76 @@ int getProcessContextCount() {

class ProcessContext {

private final String id;
private volatile AnalyticsProcess<AnalyticsResult> process;
private volatile DataFrameDataExtractor dataExtractor;
private volatile AnalyticsResultProcessor resultProcessor;
private volatile boolean processKilled;
private volatile String failureReason;
private final DataFrameAnalyticsConfig config;
private final SetOnce<AnalyticsProcess<AnalyticsResult>> process = new SetOnce<>();
private final SetOnce<DataFrameDataExtractor> dataExtractor = new SetOnce<>();
private final SetOnce<AnalyticsResultProcessor> resultProcessor = new SetOnce<>();
private final SetOnce<String> failureReason = new SetOnce<>();

ProcessContext(String id) {
this.id = Objects.requireNonNull(id);
ProcessContext(DataFrameAnalyticsConfig config) {
this.config = Objects.requireNonNull(config);
}

synchronized String getFailureReason() {
return failureReason;
String getFailureReason() {
return failureReason.get();
}

synchronized void setFailureReason(String failureReason) {
// Only set the new reason if there isn't one already as we want to keep the first reason
if (this.failureReason == null && failureReason != null) {
this.failureReason = failureReason;
void setFailureReason(String failureReason) {
if (failureReason == null) {
return;
}
// Only set the new reason if there isn't one already as we want to keep the first reason (most likely the root cause).
this.failureReason.trySet(failureReason);
}

synchronized void stop() {
LOGGER.debug("[{}] Stopping process", id);
processKilled = true;
if (dataExtractor != null) {
dataExtractor.cancel();
LOGGER.debug("[{}] Stopping process", config.getId());
if (dataExtractor.get() != null) {
dataExtractor.get().cancel();
}
if (resultProcessor != null) {
resultProcessor.cancel();
if (resultProcessor.get() != null) {
resultProcessor.get().cancel();
}
if (process != null) {
if (process.get() != null) {
try {
process.kill();
process.get().kill();
} catch (IOException e) {
LOGGER.error(new ParameterizedMessage("[{}] Failed to kill process", id), e);
LOGGER.error(new ParameterizedMessage("[{}] Failed to kill process", config.getId()), e);
}
}
}

/**
* @return {@code true} if the process was started or {@code false} if it was not because it was stopped in the meantime
*/
synchronized boolean startProcess(DataFrameDataExtractorFactory dataExtractorFactory, DataFrameAnalyticsConfig config,
DataFrameAnalyticsTask task, @Nullable BytesReference state) {
if (processKilled) {
synchronized boolean startProcess(DataFrameDataExtractorFactory dataExtractorFactory,
DataFrameAnalyticsTask task,
@Nullable BytesReference state) {
if (task.isStopping()) {
// The job was stopped before we started the process so no need to start it
return false;
}

dataExtractor = dataExtractorFactory.newExtractor(false);
dataExtractor.set(dataExtractorFactory.newExtractor(false));
AnalyticsProcessConfig analyticsProcessConfig =
createProcessConfig(config, dataExtractor, dataExtractorFactory.getExtractedFields());
createProcessConfig(dataExtractor.get(), dataExtractorFactory.getExtractedFields());
LOGGER.trace("[{}] creating analytics process with config [{}]", config.getId(), Strings.toString(analyticsProcessConfig));
// If we have no rows, that means there is no data so no point in starting the native process
// just finish the task
if (analyticsProcessConfig.rows() == 0) {
LOGGER.info("[{}] no data found to analyze. Will not start analytics native process.", config.getId());
return false;
}
process = createProcess(task, config, analyticsProcessConfig, state);
DataFrameRowsJoiner dataFrameRowsJoiner = new DataFrameRowsJoiner(config.getId(), client,
dataExtractorFactory.newExtractor(true));
resultProcessor = new AnalyticsResultProcessor(
config, dataFrameRowsJoiner, task.getProgressTracker(), trainedModelProvider, auditor, dataExtractor.getFieldNames());
process.set(createProcess(task, config, analyticsProcessConfig, state));
resultProcessor.set(createResultProcessor(task, dataExtractorFactory));
return true;
}

private AnalyticsProcessConfig createProcessConfig(
DataFrameAnalyticsConfig config, DataFrameDataExtractor dataExtractor, ExtractedFields extractedFields) {
private AnalyticsProcessConfig createProcessConfig(DataFrameDataExtractor dataExtractor,
ExtractedFields extractedFields) {
DataFrameDataExtractor.DataSummary dataSummary = dataExtractor.collectDataSummary();
Set<String> categoricalFields = dataExtractor.getCategoricalFields(config.getAnalysis());
AnalyticsProcessConfig processConfig = new AnalyticsProcessConfig(
return new AnalyticsProcessConfig(
config.getId(),
dataSummary.rows,
dataSummary.cols,
Expand All @@ -405,7 +404,14 @@ private AnalyticsProcessConfig createProcessConfig(
categoricalFields,
config.getAnalysis(),
extractedFields);
return processConfig;
}

private AnalyticsResultProcessor createResultProcessor(DataFrameAnalyticsTask task,
DataFrameDataExtractorFactory dataExtractorFactory) {
DataFrameRowsJoiner dataFrameRowsJoiner =
new DataFrameRowsJoiner(config.getId(), client, dataExtractorFactory.newExtractor(true));
return new AnalyticsResultProcessor(
config, dataFrameRowsJoiner, task.getProgressTracker(), trainedModelProvider, auditor, dataExtractor.get().getFieldNames());
}
}
}
Loading

0 comments on commit d8d90b6

Please sign in to comment.