Skip to content

Commit

Permalink
Make only a part of stop() method a critical section. (#49756)
Browse files Browse the repository at this point in the history
  • Loading branch information
przemekwitek authored Dec 3, 2019
1 parent 8f42719 commit ada3ca6
Showing 1 changed file with 8 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void runJob(DataFrameAnalyticsTask task, DataFrameAnalyticsConfig config,
Consumer<Exception> finishHandler) {
executorServiceForJob.execute(() -> {
ProcessContext processContext = new ProcessContext(config.getId());
synchronized (this) {
synchronized (processContextByAllocation) {
if (task.isStopping()) {
// The task was requested to stop before we created the process context
finishHandler.accept(null);
Expand Down Expand Up @@ -295,14 +295,17 @@ private void closeProcess(DataFrameAnalyticsTask task) {
processContext.process.close();
LOGGER.info("[{}] Closed process", configId);
} catch (Exception e) {
String errorMsg = new ParameterizedMessage("[{}] Error closing data frame analyzer process [{}]"
, configId, e.getMessage()).getFormattedMessage();
String errorMsg = new ParameterizedMessage(
"[{}] Error closing data frame analyzer process [{}]", configId, e.getMessage()).getFormattedMessage();
processContext.setFailureReason(errorMsg);
}
}

public synchronized void stop(DataFrameAnalyticsTask task) {
ProcessContext processContext = processContextByAllocation.get(task.getAllocationId());
public void stop(DataFrameAnalyticsTask task) {
ProcessContext processContext;
synchronized (processContextByAllocation) {
processContext = processContextByAllocation.get(task.getAllocationId());
}
if (processContext != null) {
LOGGER.debug("[{}] Stopping process", task.getParams().getId());
processContext.stop();
Expand Down

0 comments on commit ada3ca6

Please sign in to comment.