Skip to content

Commit

Permalink
Merge branch 'master' of github.com:elastic/elasticsearch
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Sanwald committed Jun 19, 2019
2 parents 7eae39a + 4eafad4 commit 1836d6f
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.CountingInputStream;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
Expand Down Expand Up @@ -66,7 +66,7 @@ public class AutodetectCommunicator implements Closeable {
private final AutodetectProcess autodetectProcess;
private final StateStreamer stateStreamer;
private final DataCountsReporter dataCountsReporter;
private final AutoDetectResultProcessor autoDetectResultProcessor;
private final AutodetectResultProcessor autodetectResultProcessor;
private final BiConsumer<Exception, Boolean> onFinishHandler;
private final ExecutorService autodetectWorkerExecutor;
private final NamedXContentRegistry xContentRegistry;
Expand All @@ -75,15 +75,15 @@ public class AutodetectCommunicator implements Closeable {
private volatile boolean processKilled;

AutodetectCommunicator(Job job, Environment environment, AutodetectProcess process, StateStreamer stateStreamer,
DataCountsReporter dataCountsReporter, AutoDetectResultProcessor autoDetectResultProcessor,
DataCountsReporter dataCountsReporter, AutodetectResultProcessor autodetectResultProcessor,
BiConsumer<Exception, Boolean> onFinishHandler, NamedXContentRegistry xContentRegistry,
ExecutorService autodetectWorkerExecutor) {
this.job = job;
this.environment = environment;
this.autodetectProcess = process;
this.stateStreamer = stateStreamer;
this.dataCountsReporter = dataCountsReporter;
this.autoDetectResultProcessor = autoDetectResultProcessor;
this.autodetectResultProcessor = autodetectResultProcessor;
this.onFinishHandler = onFinishHandler;
this.xContentRegistry = xContentRegistry;
this.autodetectWorkerExecutor = autodetectWorkerExecutor;
Expand Down Expand Up @@ -120,7 +120,7 @@ public void writeToJob(InputStream inputStream, AnalysisRegistry analysisRegistr
}

CountingInputStream countingStream = new CountingInputStream(inputStream, dataCountsReporter);
DataToProcessWriter autoDetectWriter = createProcessWriter(params.getDataDescription());
DataToProcessWriter autodetectWriter = createProcessWriter(params.getDataDescription());

if (includeTokensField && categorizationAnalyzer == null) {
createCategorizationAnalyzer(analysisRegistry);
Expand All @@ -129,14 +129,14 @@ public void writeToJob(InputStream inputStream, AnalysisRegistry analysisRegistr
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<DataCounts> dataCountsAtomicReference = new AtomicReference<>();
AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>();
autoDetectWriter.write(countingStream, categorizationAnalyzer, xContentType, (dataCounts, e) -> {
autodetectWriter.write(countingStream, categorizationAnalyzer, xContentType, (dataCounts, e) -> {
dataCountsAtomicReference.set(dataCounts);
exceptionAtomicReference.set(e);
latch.countDown();
});

latch.await();
autoDetectWriter.flushStream();
autodetectWriter.flushStream();

if (exceptionAtomicReference.get() != null) {
throw exceptionAtomicReference.get();
Expand Down Expand Up @@ -168,7 +168,7 @@ public void close(boolean restart, String reason) {
killProcess(false, false);
stateStreamer.cancel();
}
autoDetectResultProcessor.awaitCompletion();
autodetectResultProcessor.awaitCompletion();
} finally {
onFinishHandler.accept(restart ? new ElasticsearchException(reason) : null, true);
}
Expand Down Expand Up @@ -199,13 +199,13 @@ public void killProcess(boolean awaitCompletion, boolean finish) throws IOExcept
public void killProcess(boolean awaitCompletion, boolean finish, boolean finalizeJob) throws IOException {
try {
processKilled = true;
autoDetectResultProcessor.setProcessKilled();
autodetectResultProcessor.setProcessKilled();
autodetectWorkerExecutor.shutdown();
autodetectProcess.kill();

if (awaitCompletion) {
try {
autoDetectResultProcessor.awaitCompletion();
autodetectResultProcessor.awaitCompletion();
} catch (TimeoutException e) {
LOGGER.warn(new ParameterizedMessage("[{}] Timed out waiting for killed job", job.getId()), e);
}
Expand Down Expand Up @@ -289,20 +289,20 @@ FlushAcknowledgement waitFlushToCompletion(String flushId) throws InterruptedExc

FlushAcknowledgement flushAcknowledgement;
try {
flushAcknowledgement = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
flushAcknowledgement = autodetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
while (flushAcknowledgement == null) {
checkProcessIsAlive();
checkResultsProcessorIsAlive();
flushAcknowledgement = autoDetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
flushAcknowledgement = autodetectResultProcessor.waitForFlushAcknowledgement(flushId, FLUSH_PROCESS_CHECK_FREQUENCY);
}
} finally {
autoDetectResultProcessor.clearAwaitingFlush(flushId);
autodetectResultProcessor.clearAwaitingFlush(flushId);
}

if (processKilled == false) {
// We also have to wait for the normalizer to become idle so that we block
// clients from querying results in the middle of normalization.
autoDetectResultProcessor.waitUntilRenormalizerIsIdle();
autodetectResultProcessor.waitUntilRenormalizerIsIdle();

LOGGER.debug("[{}] Flush completed", job.getId());
}
Expand All @@ -321,7 +321,7 @@ private void checkProcessIsAlive() {
}

private void checkResultsProcessorIsAlive() {
if (autoDetectResultProcessor.isFailed()) {
if (autodetectResultProcessor.isFailed()) {
// Don't log here - it just causes double logging when the exception gets logged
throw new ElasticsearchException("[{}] Unexpected death of the result processor", job.getId());
}
Expand All @@ -332,11 +332,11 @@ public ZonedDateTime getProcessStartTime() {
}

public ModelSizeStats getModelSizeStats() {
return autoDetectResultProcessor.modelSizeStats();
return autodetectResultProcessor.modelSizeStats();
}

public TimingStats getTimingStats() {
return autoDetectResultProcessor.timingStats();
return autodetectResultProcessor.timingStats();
}

public DataCounts getDataCounts() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
Expand Down Expand Up @@ -500,18 +500,18 @@ AutodetectCommunicator create(JobTask jobTask, Job job, AutodetectParams autodet
}

// A TP with no queue, so that we fail immediately if there are no threads available
ExecutorService autoDetectExecutorService = threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME);
ExecutorService autodetectExecutorService = threadPool.executor(MachineLearning.JOB_COMMS_THREAD_POOL_NAME);
DataCountsReporter dataCountsReporter = new DataCountsReporter(job, autodetectParams.dataCounts(), jobDataCountsPersister);
ScoresUpdater scoresUpdater = new ScoresUpdater(job, jobResultsProvider,
new JobRenormalizedResultsPersister(job.getId(), client), normalizerFactory);
ExecutorService renormalizerExecutorService = threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME);
Renormalizer renormalizer = new ShortCircuitingRenormalizer(jobId, scoresUpdater,
renormalizerExecutorService);

AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams, autoDetectExecutorService,
AutodetectProcess process = autodetectProcessFactory.createAutodetectProcess(job, autodetectParams, autodetectExecutorService,
onProcessCrash(jobTask));
AutoDetectResultProcessor processor =
new AutoDetectResultProcessor(
AutodetectResultProcessor processor =
new AutodetectResultProcessor(
client,
auditor,
jobId,
Expand All @@ -521,8 +521,8 @@ AutodetectCommunicator create(JobTask jobTask, Job job, AutodetectParams autodet
autodetectParams.timingStats());
ExecutorService autodetectWorkerExecutor;
try (ThreadContext.StoredContext ignore = threadPool.getThreadContext().stashContext()) {
autodetectWorkerExecutor = createAutodetectExecutorService(autoDetectExecutorService);
autoDetectExecutorService.submit(() -> processor.process(process));
autodetectWorkerExecutor = createAutodetectExecutorService(autodetectExecutorService);
autodetectExecutorService.submit(() -> processor.process(process));
} catch (EsRejectedExecutionException e) {
// If submitting the operation to read the results from the process fails we need to close
// the process too, so that other submitted operations to threadpool are stopped.
Expand Down Expand Up @@ -734,9 +734,9 @@ public Optional<Tuple<DataCounts, Tuple<ModelSizeStats, TimingStats>>> getStatis
}

ExecutorService createAutodetectExecutorService(ExecutorService executorService) {
AutodetectWorkerExecutorService autoDetectWorkerExecutor = new AutodetectWorkerExecutorService(threadPool.getThreadContext());
executorService.submit(autoDetectWorkerExecutor::start);
return autoDetectWorkerExecutor;
AutodetectWorkerExecutorService autodetectWorkerExecutor = new AutodetectWorkerExecutorService(threadPool.getThreadContext());
executorService.submit(autodetectWorkerExecutor::start);
return autodetectWorkerExecutor;
}

public ByteSizeValue getMinLocalStorageAvailable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@
* interim results and the old interim results have to be cleared out
* before the new ones are written.
*/
public class AutoDetectResultProcessor {
public class AutodetectResultProcessor {

private static final Logger LOGGER = LogManager.getLogger(AutoDetectResultProcessor.class);
private static final Logger LOGGER = LogManager.getLogger(AutodetectResultProcessor.class);

private final Client client;
private final Auditor auditor;
Expand Down Expand Up @@ -100,14 +100,14 @@ public class AutoDetectResultProcessor {
*/
private TimingStats persistedTimingStats; // only used from the process() thread, so doesn't need to be volatile

public AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer,
public AutodetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer,
JobResultsPersister persister,
ModelSizeStats latestModelSizeStats,
TimingStats timingStats) {
this(client, auditor, jobId, renormalizer, persister, latestModelSizeStats, timingStats, new FlushListener());
}

AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer,
AutodetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer,
JobResultsPersister persister, ModelSizeStats latestModelSizeStats, TimingStats timingStats,
FlushListener flushListener) {
this.client = Objects.requireNonNull(client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.persistence.RecordsQueryBuilder;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import org.elasticsearch.xpack.ml.job.results.BucketTests;
Expand Down Expand Up @@ -75,7 +75,7 @@ public class AutodetectResultProcessorIT extends MlSingleNodeTestCase {

private JobResultsProvider jobResultsProvider;
private List<ModelSnapshot> capturedUpdateModelSnapshotOnJobRequests;
private AutoDetectResultProcessor resultProcessor;
private AutodetectResultProcessor resultProcessor;
private Renormalizer renormalizer;

@Override
Expand All @@ -91,7 +91,7 @@ public void createComponents() throws Exception {
jobResultsProvider = new JobResultsProvider(client(), builder.build());
renormalizer = mock(Renormalizer.class);
capturedUpdateModelSnapshotOnJobRequests = new ArrayList<>();
resultProcessor = new AutoDetectResultProcessor(client(), auditor, JOB_ID, renormalizer,
resultProcessor = new AutodetectResultProcessor(client(), auditor, JOB_ID, renormalizer,
new JobResultsPersister(client()), new ModelSizeStats.Builder(JOB_ID).build(), new TimingStats(JOB_ID)) {
@Override
protected void updateModelSnapshotOnJob(ModelSnapshot modelSnapshot) {
Expand Down
Loading

0 comments on commit 1836d6f

Please sign in to comment.