Skip to content

Commit

Permalink
Move async task maintenance service to core plugin
Browse files Browse the repository at this point in the history
The async task task maintenance service is used by both async search plugin
as well as EQL plugin. So it needs to reside in the core.

Relates to elastic#49638
  • Loading branch information
imotov committed Jun 4, 2020
1 parent d197a85 commit cc05e1b
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,49 +7,28 @@

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;

import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
import static org.elasticsearch.xpack.search.AsyncSearchMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING;
import static org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING;

public final class AsyncSearch extends Plugin implements ActionPlugin {
private final Settings settings;

public AsyncSearch(Settings settings) {
this.settings = settings;
}

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
Expand All @@ -71,31 +50,6 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
);
}

@Override
public Collection<Object> createComponents(Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry xContentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier) {
if (DiscoveryNode.isDataNode(environment.settings())) {
// only data nodes should be eligible to run the maintenance service.
AsyncTaskIndexService<AsyncSearchResponse> indexService =
new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService, threadPool.getThreadContext(), client,
ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, namedWriteableRegistry);
AsyncSearchMaintenanceService maintenanceService =
new AsyncSearchMaintenanceService(clusterService, nodeEnvironment.nodeId(), settings, threadPool, indexService);
return Collections.singletonList(maintenanceService);
} else {
return Collections.emptyList();
}
}

@Override
public List<Setting<?>> getSettings() {
return Collections.singletonList(ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService;
import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction;
import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction;
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest;
Expand All @@ -50,7 +51,7 @@
import java.util.concurrent.ExecutionException;

import static org.elasticsearch.xpack.core.XPackPlugin.ASYNC_RESULTS_INDEX;
import static org.elasticsearch.xpack.search.AsyncSearchMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING;
import static org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService.ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

Expand Down Expand Up @@ -79,7 +80,7 @@ public List<AggregationSpec> getAggregations() {

@Before
public void startMaintenanceService() {
for (AsyncSearchMaintenanceService service : internalCluster().getDataNodeInstances(AsyncSearchMaintenanceService.class)) {
for (AsyncTaskMaintenanceService service : internalCluster().getDataNodeInstances(AsyncTaskMaintenanceService.class)) {
if (service.lifecycleState() == Lifecycle.State.STOPPED) {
// force the service to start again
service.start();
Expand All @@ -91,7 +92,7 @@ public void startMaintenanceService() {

@After
public void stopMaintenanceService() {
for (AsyncSearchMaintenanceService service : internalCluster().getDataNodeInstances(AsyncSearchMaintenanceService.class)) {
for (AsyncTaskMaintenanceService service : internalCluster().getDataNodeInstances(AsyncTaskMaintenanceService.class)) {
service.stop();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,15 @@
import org.elasticsearch.xpack.core.action.XPackInfoAction;
import org.elasticsearch.xpack.core.action.XPackUsageAction;
import org.elasticsearch.xpack.core.action.XPackUsageResponse;
import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
import org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService;
import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction;
import org.elasticsearch.xpack.core.async.TransportDeleteAsyncResultAction;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.rest.action.RestReloadAnalyzersAction;
import org.elasticsearch.xpack.core.rest.action.RestXPackInfoAction;
import org.elasticsearch.xpack.core.rest.action.RestXPackUsageAction;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.security.authc.TokenMetadata;
import org.elasticsearch.xpack.core.ssl.SSLConfiguration;
import org.elasticsearch.xpack.core.ssl.SSLConfigurationReloader;
Expand All @@ -88,6 +91,8 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;

public class XPackPlugin extends XPackClientPlugin implements ExtensiblePlugin, RepositoryPlugin, EnginePlugin {

private static final Logger logger = LogManager.getLogger(XPackPlugin.class);
Expand Down Expand Up @@ -248,6 +253,16 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
components.add(getLicenseService());
components.add(getLicenseState());

if (DiscoveryNode.isDataNode(environment.settings())) {
// only data nodes should be eligible to run the maintenance service.
AsyncTaskIndexService<AsyncSearchResponse> indexService =
new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService, threadPool.getThreadContext(), client,
ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, namedWriteableRegistry);
AsyncTaskMaintenanceService maintenanceService =
new AsyncTaskMaintenanceService(clusterService, nodeEnvironment.nodeId(), settings, threadPool, indexService);
components.add(maintenanceService);
}

return components;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.gateway.GatewayService;
Expand All @@ -23,6 +25,7 @@
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.XPackPlugin;

import java.io.IOException;

Expand All @@ -34,7 +37,17 @@
* Since we will have several injected implementation of this class injected into different transports, and we bind components created
* by {@linkplain org.elasticsearch.plugins.Plugin#createComponents} to their classes, we need to implement one class per binding.
*/
public abstract class AsyncTaskMaintenanceService extends AbstractLifecycleComponent implements ClusterStateListener {
public class AsyncTaskMaintenanceService extends AbstractLifecycleComponent implements ClusterStateListener {

/**
* Controls the interval at which the cleanup is scheduled.
* Defaults to 1h. It is an undocumented/expert setting that
* is mainly used by integration tests to make the garbage
* collection of search responses more reactive.
*/
public static final Setting<TimeValue> ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING =
Setting.timeSetting("async_search.index_cleanup_interval", TimeValue.timeValueHours(1), Setting.Property.NodeScope);

private static final Logger logger = LogManager.getLogger(AsyncTaskMaintenanceService.class);

private final ClusterService clusterService;
Expand All @@ -48,17 +61,16 @@ public abstract class AsyncTaskMaintenanceService extends AbstractLifecycleCompo
private volatile Scheduler.Cancellable cancellable;

public AsyncTaskMaintenanceService(ClusterService clusterService,
String index,
String localNodeId,
Settings nodeSettings,
ThreadPool threadPool,
AsyncTaskIndexService<?> indexService,
TimeValue delay) {
AsyncTaskIndexService<?> indexService) {
this.clusterService = clusterService;
this.index = index;
this.index = XPackPlugin.ASYNC_RESULTS_INDEX;
this.localNodeId = localNodeId;
this.threadPool = threadPool;
this.indexService = indexService;
this.delay = delay;
this.delay = ASYNC_SEARCH_CLEANUP_INTERVAL_SETTING.get(nodeSettings);
}


Expand Down

0 comments on commit cc05e1b

Please sign in to comment.