diff --git a/x-pack/plugin/async/src/main/java/org/elasticsearch/xpack/async/AsyncResultsIndexPlugin.java b/x-pack/plugin/async/src/main/java/org/elasticsearch/xpack/async/AsyncResultsIndexPlugin.java index faa1628cfbd68..4bafef96556ef 100644 --- a/x-pack/plugin/async/src/main/java/org/elasticsearch/xpack/async/AsyncResultsIndexPlugin.java +++ b/x-pack/plugin/async/src/main/java/org/elasticsearch/xpack/async/AsyncResultsIndexPlugin.java @@ -29,7 +29,6 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.function.Supplier; @@ -45,7 +44,7 @@ public AsyncResultsIndexPlugin(Settings settings) { @Override public Collection getSystemIndexDescriptors(Settings settings) { - return Collections.singletonList(new SystemIndexDescriptor(XPackPlugin.ASYNC_RESULTS_INDEX, this.getClass().getSimpleName())); + return List.of(AsyncTaskIndexService.getSystemIndexDescriptor()); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java index 1ccc8d1e878a0..bc3da1e3015a6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java @@ -5,10 +5,6 @@ */ package org.elasticsearch.xpack.core.async; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -35,8 +31,10 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse; import org.elasticsearch.xpack.core.security.SecurityContext; @@ -44,6 +42,7 @@ import org.elasticsearch.xpack.core.security.authc.support.AuthenticationContextSerializer; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.util.Base64; import java.util.Collections; @@ -54,19 +53,25 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; +import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; import static org.elasticsearch.xpack.core.security.authc.AuthenticationField.AUTHENTICATION_KEY; /** * A service that exposes the CRUD operations for the async task-specific index. */ public final class AsyncTaskIndexService> { - private static final Logger logger = LogManager.getLogger(AsyncTaskIndexService.class); public static final String HEADERS_FIELD = "headers"; public static final String RESPONSE_HEADERS_FIELD = "response_headers"; public static final String EXPIRATION_TIME_FIELD = "expiration_time"; public static final String RESULT_FIELD = "result"; + // Usually the settings, mappings and system index descriptor below + // would be co-located with the SystemIndexPlugin implementation, + // however in this case this service is in a different project to + // AsyncResultsIndexPlugin, as are tests that need access to + // #settings(). + static Settings settings() { return Settings.builder() .put("index.codec", "best_compression") @@ -76,44 +81,58 @@ static Settings settings() { .build(); } - static XContentBuilder mappings() throws IOException { - XContentBuilder builder = jsonBuilder() - .startObject() - .startObject(SINGLE_MAPPING_NAME) - .startObject("_meta") - .field("version", Version.CURRENT) - .endObject() - .field("dynamic", "strict") - .startObject("properties") - .startObject(HEADERS_FIELD) - .field("type", "object") - .field("enabled", "false") - .endObject() - .startObject(RESPONSE_HEADERS_FIELD) - .field("type", "object") - .field("enabled", "false") - .endObject() - .startObject(RESULT_FIELD) - .field("type", "object") - .field("enabled", "false") + private static XContentBuilder mappings() { + try { + XContentBuilder builder = jsonBuilder() + .startObject() + .startObject(SINGLE_MAPPING_NAME) + .startObject("_meta") + .field("version", Version.CURRENT) .endObject() - .startObject(EXPIRATION_TIME_FIELD) - .field("type", "long") + .field("dynamic", "strict") + .startObject("properties") + .startObject(HEADERS_FIELD) + .field("type", "object") + .field("enabled", "false") + .endObject() + .startObject(RESPONSE_HEADERS_FIELD) + .field("type", "object") + .field("enabled", "false") + .endObject() + .startObject(RESULT_FIELD) + .field("type", "object") + .field("enabled", "false") + .endObject() + .startObject(EXPIRATION_TIME_FIELD) + .field("type", "long") + .endObject() .endObject() .endObject() - .endObject() - .endObject(); - return builder; + .endObject(); + return builder; + } catch (IOException e) { + throw new UncheckedIOException("Failed to build mappings for " + XPackPlugin.ASYNC_RESULTS_INDEX, e); + } + } + + public static SystemIndexDescriptor getSystemIndexDescriptor() { + return SystemIndexDescriptor.builder() + .setIndexPattern(XPackPlugin.ASYNC_RESULTS_INDEX) + .setDescription("Async search results") + .setPrimaryIndex(XPackPlugin.ASYNC_RESULTS_INDEX) + .setMappings(mappings()) + .setSettings(settings()) + .setVersionMetaKey("version") + .setOrigin(ASYNC_SEARCH_ORIGIN) + .build(); } private final String index; - private final ClusterService clusterService; private final Client client; private final SecurityContext securityContext; private final NamedWriteableRegistry registry; private final Writeable.Reader reader; - public AsyncTaskIndexService(String index, ClusterService clusterService, ThreadContext threadContext, @@ -122,7 +141,6 @@ public AsyncTaskIndexService(String index, Writeable.Reader reader, NamedWriteableRegistry registry) { this.index = index; - this.clusterService = clusterService; this.securityContext = new SecurityContext(clusterService.getSettings(), threadContext); this.client = new OriginSettingClient(client, origin); this.registry = registry; @@ -136,34 +154,6 @@ public Client getClient() { return client; } - /** - * Creates the index with the expected settings and mappings if it doesn't exist. - */ - void createIndexIfNecessary(ActionListener listener) { - if (clusterService.state().routingTable().hasIndex(index) == false) { - try { - client.admin().indices().prepareCreate(index) - .setSettings(settings()) - .setMapping(mappings()) - .execute(ActionListener.wrap( - resp -> listener.onResponse(null), - exc -> { - if (ExceptionsHelper.unwrapCause(exc) instanceof ResourceAlreadyExistsException) { - listener.onResponse(null); - } else { - logger.error("failed to create " + index + " index", exc); - listener.onFailure(exc); - } - })); - } catch (Exception exc) { - logger.error("failed to create " + index + " index", exc); - listener.onFailure(exc); - } - } else { - listener.onResponse(null); - } - } - /** * Stores the initial response with the original headers of the authenticated user * and the expected expiration time. @@ -180,7 +170,7 @@ public void createResponse(String docId, .create(true) .id(docId) .source(source, XContentType.JSON); - createIndexIfNecessary(ActionListener.wrap(v -> client.index(indexRequest, listener), listener::onFailure)); + client.index(indexRequest, listener); } /** @@ -199,9 +189,7 @@ public void updateResponse(String docId, .id(docId) .doc(source, XContentType.JSON) .retryOnConflict(5); - // updates create the index automatically if it doesn't exist so we force the creation - // preemptively. - createIndexIfNecessary(ActionListener.wrap(v -> client.update(request, listener), listener::onFailure)); + client.update(request, listener); } catch(Exception e) { listener.onFailure(e); } @@ -219,9 +207,7 @@ public void updateExpirationTime(String docId, .id(docId) .doc(source, XContentType.JSON) .retryOnConflict(5); - // updates create the index automatically if it doesn't exist so we force the creation - // preemptively. - createIndexIfNecessary(ActionListener.wrap(v -> client.update(request, listener), listener::onFailure)); + client.update(request, listener); } /** @@ -231,9 +217,7 @@ public void deleteResponse(AsyncExecutionId asyncExecutionId, ActionListener listener) { try { DeleteRequest request = new DeleteRequest(index).id(asyncExecutionId.getDocId()); - // deletes create the index automatically if it doesn't exist so we force the creation - // preemptively. - createIndexIfNecessary(ActionListener.wrap(v -> client.delete(request, listener), listener::onFailure)); + client.delete(request, listener); } catch(Exception e) { listener.onFailure(e); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java index 99b538da7dd99..66fa5da060072 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java @@ -15,6 +15,9 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.indices.SystemIndexDescriptor; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.SystemIndexPlugin; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.transport.TransportService; @@ -24,7 +27,10 @@ import org.junit.Before; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.List; // TODO: test CRUD operations public class AsyncTaskServiceTests extends ESSingleNodeTestCase { @@ -41,6 +47,23 @@ public void setup() { client(), "test_origin", AsyncSearchResponse::new, writableRegistry()); } + @Override + protected Collection> getPlugins() { + List> plugins = new ArrayList<>(super.getPlugins()); + plugins.add(TestPlugin.class); + return plugins; + } + + /** + * This class exists because AsyncResultsIndexPlugin exists in a different x-pack module. + */ + public static class TestPlugin extends Plugin implements SystemIndexPlugin { + @Override + public Collection getSystemIndexDescriptors(Settings settings) { + return List.of(AsyncTaskIndexService.getSystemIndexDescriptor()); + } + } + public void testEnsuredAuthenticatedUserIsSame() throws IOException { Authentication original = new Authentication(new User("test", "role"), new Authentication.RealmRef("realm", "file", "node"), null); @@ -99,15 +122,7 @@ public void testEnsuredAuthenticatedUserIsSame() throws IOException { } public void testAutoCreateIndex() throws Exception { - { - PlainActionFuture future = PlainActionFuture.newFuture(); - indexService.createIndexIfNecessary(future); - future.get(); - assertSettings(); - } - AcknowledgedResponse ack = client().admin().indices().prepareDelete(index).get(); - assertTrue(ack.isAcknowledged()); - + // To begin with, the results index should be auto-created. AsyncExecutionId id = new AsyncExecutionId("0", new TaskId("N/A", 0)); AsyncSearchResponse resp = new AsyncSearchResponse(id.getEncoded(), true, true, 0L, 0L); { @@ -116,37 +131,48 @@ public void testAutoCreateIndex() throws Exception { future.get(); assertSettings(); } - ack = client().admin().indices().prepareDelete(index).get(); + + // Delete the index, so we can test subsequent auto-create behaviour + AcknowledgedResponse ack = client().admin().indices().prepareDelete(index).get(); assertTrue(ack.isAcknowledged()); + + // Subsequent response deletes throw a (wrapped) index not found exception { PlainActionFuture future = PlainActionFuture.newFuture(); indexService.deleteResponse(id, future); - future.get(); - assertSettings(); + expectThrows(Exception.class, future::get); } - ack = client().admin().indices().prepareDelete(index).get(); - assertTrue(ack.isAcknowledged()); + + // So do updates { PlainActionFuture future = PlainActionFuture.newFuture(); indexService.updateResponse(id.getDocId(), Collections.emptyMap(), resp, future); - expectThrows(Exception.class, () -> future.get()); + expectThrows(Exception.class, future::get); assertSettings(); } - ack = client().admin().indices().prepareDelete(index).get(); - assertTrue(ack.isAcknowledged()); + + // And so does updating the expiration time { PlainActionFuture future = PlainActionFuture.newFuture(); indexService.updateExpirationTime("0", 10L, future); - expectThrows(Exception.class, () -> future.get()); + expectThrows(Exception.class, future::get); + assertSettings(); + } + + // But the index is still auto-created + { + PlainActionFuture future = PlainActionFuture.newFuture(); + indexService.createResponse(id.getDocId(), Collections.emptyMap(), resp, future); + future.get(); assertSettings(); } } - private void assertSettings() throws IOException { + private void assertSettings() { GetIndexResponse getIndexResponse = client().admin().indices().getIndex( new GetIndexRequest().indices(index)).actionGet(); Settings settings = getIndexResponse.getSettings().get(index); Settings expected = AsyncTaskIndexService.settings(); - assertEquals(expected, settings.filter(key -> expected.hasValue(key))); + assertEquals(expected, settings.filter(expected::hasValue)); } }