diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java index 43303c0dcfa6e..9b730db7ec59b 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/ExecutionService.java @@ -534,12 +534,12 @@ private void clearExecutions() { // the watch execution task takes another runnable as parameter // the best solution would be to move the whole execute() method, which is handed over as ctor parameter // over into this class, this is the quicker way though - static final class WatchExecutionTask implements Runnable { + public static final class WatchExecutionTask implements Runnable { private final WatchExecutionContext ctx; private final Runnable runnable; - WatchExecutionTask(WatchExecutionContext ctx, Runnable runnable) { + public WatchExecutionTask(WatchExecutionContext ctx, Runnable runnable) { this.ctx = ctx; this.runnable = runnable; } diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/execute/TransportExecuteWatchAction.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/execute/TransportExecuteWatchAction.java index b13ba9780bf74..a049267594611 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/execute/TransportExecuteWatchAction.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/execute/TransportExecuteWatchAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.watcher.execution.ActionExecutionMode; +import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext; import org.elasticsearch.xpack.core.watcher.history.WatchRecord; import org.elasticsearch.xpack.core.watcher.support.xcontent.WatcherParams; import org.elasticsearch.xpack.core.watcher.transport.actions.execute.ExecuteWatchAction; @@ -110,48 +111,63 @@ protected void doExecute(ExecuteWatchRequest request, ActionListener listener, - Watch watch, boolean knownWatch) { - - threadPool.executor(XPackField.WATCHER).submit(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - listener.onFailure(e); + private void executeWatch( + final ExecuteWatchRequest request, + final ActionListener listener, + final Watch watch, + final boolean knownWatch) { + try { + /* + * Ensure that the headers from the incoming request are used instead those of the stored watch otherwise the watch would run + * as the user who stored the watch, but it needs to run as the user who executes this request. + */ + final Map headers = new HashMap<>(threadPool.getThreadContext().getHeaders()); + watch.status().setHeaders(headers); + + final String triggerType = watch.trigger().type(); + final TriggerEvent triggerEvent = triggerService.simulateEvent(triggerType, watch.id(), request.getTriggerData()); + + final ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder( + watch, + knownWatch, + new ManualTriggerEvent(triggerEvent.jobName(), triggerEvent), executionService.defaultThrottlePeriod()); + + final ZonedDateTime executionTime = clock.instant().atZone(ZoneOffset.UTC); + ctxBuilder.executionTime(executionTime); + for (final Map.Entry entry : request.getActionModes().entrySet()) { + ctxBuilder.actionMode(entry.getKey(), entry.getValue()); + } + if (request.getAlternativeInput() != null) { + ctxBuilder.withInput(new SimpleInput.Result(new Payload.Simple(request.getAlternativeInput()))); } + if (request.isIgnoreCondition()) { + ctxBuilder.withCondition(InternalAlwaysCondition.RESULT_INSTANCE); + } + ctxBuilder.recordExecution(request.isRecordExecution()); + final WatchExecutionContext ctx = ctxBuilder.build(); - @Override - protected void doRun() throws Exception { - // ensure that the headers from the incoming request are used instead those of the stored watch - // otherwise the watch would run as the user who stored the watch, but it needs to be run as the user who - // executes this request - Map headers = new HashMap<>(threadPool.getThreadContext().getHeaders()); - watch.status().setHeaders(headers); + // use execute so that the runnable is not wrapped in a RunnableFuture + threadPool.executor(XPackField.WATCHER).execute(new ExecutionService.WatchExecutionTask(ctx, new AbstractRunnable() { - String triggerType = watch.trigger().type(); - TriggerEvent triggerEvent = triggerService.simulateEvent(triggerType, watch.id(), request.getTriggerData()); + @Override + public void onFailure(final Exception e) { + listener.onFailure(e); + } - ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder(watch, knownWatch, - new ManualTriggerEvent(triggerEvent.jobName(), triggerEvent), executionService.defaultThrottlePeriod()); + @Override + protected void doRun() throws Exception { + final WatchRecord record = executionService.execute(ctx); + final XContentBuilder builder = XContentFactory.jsonBuilder(); - ZonedDateTime executionTime = clock.instant().atZone(ZoneOffset.UTC); - ctxBuilder.executionTime(executionTime); - for (Map.Entry entry : request.getActionModes().entrySet()) { - ctxBuilder.actionMode(entry.getKey(), entry.getValue()); + record.toXContent(builder, WatcherParams.builder().hideSecrets(true).debug(request.isDebug()).build()); + listener.onResponse(new ExecuteWatchResponse(record.id().value(), BytesReference.bytes(builder), XContentType.JSON)); } - if (request.getAlternativeInput() != null) { - ctxBuilder.withInput(new SimpleInput.Result(new Payload.Simple(request.getAlternativeInput()))); - } - if (request.isIgnoreCondition()) { - ctxBuilder.withCondition(InternalAlwaysCondition.RESULT_INSTANCE); - } - ctxBuilder.recordExecution(request.isRecordExecution()); - WatchRecord record = executionService.execute(ctxBuilder.build()); - XContentBuilder builder = XContentFactory.jsonBuilder(); + })); + } catch (final Exception e) { + listener.onFailure(e); + } + - record.toXContent(builder, WatcherParams.builder().hideSecrets(true).debug(request.isDebug()).build()); - listener.onResponse(new ExecuteWatchResponse(record.id().value(), BytesReference.bytes(builder), XContentType.JSON)); - } - }); } } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecuteWatchQueuedStatsTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecuteWatchQueuedStatsTests.java new file mode 100644 index 0000000000000..d4686652eaeff --- /dev/null +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/ExecuteWatchQueuedStatsTests.java @@ -0,0 +1,124 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.watcher.execution; + +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.core.watcher.client.WatchSourceBuilder; +import org.elasticsearch.xpack.core.watcher.client.WatcherClient; +import org.elasticsearch.xpack.core.watcher.execution.ActionExecutionMode; +import org.elasticsearch.xpack.core.watcher.transport.actions.execute.ExecuteWatchRequest; +import org.elasticsearch.xpack.core.watcher.transport.actions.execute.ExecuteWatchResponse; +import org.elasticsearch.xpack.core.watcher.transport.actions.stats.WatcherStatsResponse; +import org.elasticsearch.xpack.watcher.actions.index.IndexAction; +import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase; +import org.elasticsearch.xpack.watcher.trigger.manual.ManualTriggerEvent; +import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent; + +import java.io.IOException; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutionException; + +import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput; +import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule; +import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval; +import static org.hamcrest.Matchers.empty; + +public class ExecuteWatchQueuedStatsTests extends AbstractWatcherIntegrationTestCase { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + // we use a small thread pool to force executions to be queued + return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put("xpack.watcher.thread_pool.size", 1).build(); + } + + @Override + protected boolean timeWarped() { + return false; + } + + /* + * This test is effectively forcing a manually executed watch to end up queued while we simultaneously try to get stats, including + * queued watches. The reason that we do this is because previously a manually executed watch would be queued as a FutureTask while + * we try to cast queued watches to WatchExecutionTask. This would previously result in a ClassCastException. This test fails when that + * happens yet succeeds with the production code change that accompanies this test. + */ + public void testQueuedStats() throws ExecutionException, InterruptedException { + final WatcherClient client = new WatcherClient(client()); + client.preparePutWatch("id") + .setActive(true) + .setSource( + new WatchSourceBuilder() + .input(simpleInput("payload", "yes")) + .trigger(schedule(interval("1s"))) + .addAction( + "action", + TimeValue.timeValueSeconds(1), + IndexAction.builder("test_index", "acknowledgement").setDocId("id"))) + .get(); + + final int numberOfIterations = 128 - scaledRandomIntBetween(0, 128); + + final CyclicBarrier barrier = new CyclicBarrier(2); + + final List> futures = new ArrayList<>(numberOfIterations); + final Thread executeWatchThread = new Thread(() -> { + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + fail(e.toString()); + } + for (int i = 0; i < numberOfIterations; i++) { + final ExecuteWatchRequest request = new ExecuteWatchRequest("id"); + try { + request.setTriggerEvent(new ManualTriggerEvent( + "id-" + i, + new ScheduleTriggerEvent(ZonedDateTime.now(ZoneOffset.UTC), ZonedDateTime.now(ZoneOffset.UTC)))); + } catch (final IOException e) { + fail(e.toString()); + } + request.setActionMode("_all", ActionExecutionMode.EXECUTE); + request.setRecordExecution(true); + futures.add(client.executeWatch(request)); + } + }); + executeWatchThread.start(); + + final List failures = new ArrayList<>(); + final Thread watcherStatsThread = new Thread(() -> { + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + fail(e.toString()); + } + for (int i = 0; i < numberOfIterations; i++) { + final WatcherStatsResponse response = client.prepareWatcherStats().setIncludeQueuedWatches(true).get(); + failures.addAll(response.failures()); + } + }); + watcherStatsThread.start(); + + executeWatchThread.join(); + watcherStatsThread.join(); + + for (final ActionFuture future : futures) { + future.get(); + } + + assertThat(failures, empty()); + + client.prepareDeleteWatch("id").get(); + } + +}