From ffa1b96962d6b9ab2fd475978bd5b5ada6371988 Mon Sep 17 00:00:00 2001 From: Rory Hunter Date: Wed, 9 Dec 2020 19:56:11 +0000 Subject: [PATCH 1/3] Migrate async search to use an auto-created system index --- .../xpack/async/AsyncResultsIndexPlugin.java | 24 +++++++- .../core/async/AsyncTaskIndexService.java | 58 ++++--------------- .../core/async/AsyncTaskServiceTests.java | 19 +++--- 3 files changed, 42 insertions(+), 59 deletions(-) diff --git a/x-pack/plugin/async/src/main/java/org/elasticsearch/xpack/async/AsyncResultsIndexPlugin.java b/x-pack/plugin/async/src/main/java/org/elasticsearch/xpack/async/AsyncResultsIndexPlugin.java index faa1628cfbd68..13e716045c20d 100644 --- a/x-pack/plugin/async/src/main/java/org/elasticsearch/xpack/async/AsyncResultsIndexPlugin.java +++ b/x-pack/plugin/async/src/main/java/org/elasticsearch/xpack/async/AsyncResultsIndexPlugin.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.indices.SystemIndexDescriptor; @@ -27,13 +28,15 @@ import org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.function.Supplier; import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.LOGSTASH_MANAGEMENT_ORIGIN; public class AsyncResultsIndexPlugin extends Plugin implements SystemIndexPlugin { @@ -45,7 +48,24 @@ public AsyncResultsIndexPlugin(Settings settings) { @Override public Collection getSystemIndexDescriptors(Settings settings) { - return Collections.singletonList(new SystemIndexDescriptor(XPackPlugin.ASYNC_RESULTS_INDEX, this.getClass().getSimpleName())); + final XContentBuilder mappings; + try { + mappings = AsyncTaskIndexService.mappings(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to build " + XPackPlugin.ASYNC_RESULTS_INDEX + " index mappings", e); + } + + return List.of( + SystemIndexDescriptor.builder() + .setIndexPattern(XPackPlugin.ASYNC_RESULTS_INDEX) + .setDescription(this.getClass().getSimpleName()) + .setPrimaryIndex(XPackPlugin.ASYNC_RESULTS_INDEX) + .setMappings(mappings) + .setSettings(AsyncTaskIndexService.settings()) + .setVersionMetaKey("version") + .setOrigin(LOGSTASH_MANAGEMENT_ORIGIN) + .build() + ); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java index 1ccc8d1e878a0..b58087408e66b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java @@ -5,10 +5,6 @@ */ package org.elasticsearch.xpack.core.async; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -60,14 +56,17 @@ * A service that exposes the CRUD operations for the async task-specific index. */ public final class AsyncTaskIndexService> { - private static final Logger logger = LogManager.getLogger(AsyncTaskIndexService.class); public static final String HEADERS_FIELD = "headers"; public static final String RESPONSE_HEADERS_FIELD = "response_headers"; public static final String EXPIRATION_TIME_FIELD = "expiration_time"; public static final String RESULT_FIELD = "result"; - static Settings settings() { + // Usually the settings and mappings below would be co-located with the SystemIndexPlugin implementation, + // however in this case this service is in a different project to AsyncResultsIndexPlugin, as are tests + // that need access to #settings(). + + public static Settings settings() { return Settings.builder() .put("index.codec", "best_compression") .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) @@ -76,7 +75,7 @@ static Settings settings() { .build(); } - static XContentBuilder mappings() throws IOException { + public static XContentBuilder mappings() throws IOException { XContentBuilder builder = jsonBuilder() .startObject() .startObject(SINGLE_MAPPING_NAME) @@ -107,13 +106,11 @@ static XContentBuilder mappings() throws IOException { } private final String index; - private final ClusterService clusterService; private final Client client; private final SecurityContext securityContext; private final NamedWriteableRegistry registry; private final Writeable.Reader reader; - public AsyncTaskIndexService(String index, ClusterService clusterService, ThreadContext threadContext, @@ -122,7 +119,6 @@ public AsyncTaskIndexService(String index, Writeable.Reader reader, NamedWriteableRegistry registry) { this.index = index; - this.clusterService = clusterService; this.securityContext = new SecurityContext(clusterService.getSettings(), threadContext); this.client = new OriginSettingClient(client, origin); this.registry = registry; @@ -136,34 +132,6 @@ public Client getClient() { return client; } - /** - * Creates the index with the expected settings and mappings if it doesn't exist. - */ - void createIndexIfNecessary(ActionListener listener) { - if (clusterService.state().routingTable().hasIndex(index) == false) { - try { - client.admin().indices().prepareCreate(index) - .setSettings(settings()) - .setMapping(mappings()) - .execute(ActionListener.wrap( - resp -> listener.onResponse(null), - exc -> { - if (ExceptionsHelper.unwrapCause(exc) instanceof ResourceAlreadyExistsException) { - listener.onResponse(null); - } else { - logger.error("failed to create " + index + " index", exc); - listener.onFailure(exc); - } - })); - } catch (Exception exc) { - logger.error("failed to create " + index + " index", exc); - listener.onFailure(exc); - } - } else { - listener.onResponse(null); - } - } - /** * Stores the initial response with the original headers of the authenticated user * and the expected expiration time. @@ -180,7 +148,7 @@ public void createResponse(String docId, .create(true) .id(docId) .source(source, XContentType.JSON); - createIndexIfNecessary(ActionListener.wrap(v -> client.index(indexRequest, listener), listener::onFailure)); + client.index(indexRequest, listener); } /** @@ -199,9 +167,7 @@ public void updateResponse(String docId, .id(docId) .doc(source, XContentType.JSON) .retryOnConflict(5); - // updates create the index automatically if it doesn't exist so we force the creation - // preemptively. - createIndexIfNecessary(ActionListener.wrap(v -> client.update(request, listener), listener::onFailure)); + client.update(request, listener); } catch(Exception e) { listener.onFailure(e); } @@ -219,9 +185,7 @@ public void updateExpirationTime(String docId, .id(docId) .doc(source, XContentType.JSON) .retryOnConflict(5); - // updates create the index automatically if it doesn't exist so we force the creation - // preemptively. - createIndexIfNecessary(ActionListener.wrap(v -> client.update(request, listener), listener::onFailure)); + client.update(request, listener); } /** @@ -231,9 +195,7 @@ public void deleteResponse(AsyncExecutionId asyncExecutionId, ActionListener listener) { try { DeleteRequest request = new DeleteRequest(index).id(asyncExecutionId.getDocId()); - // deletes create the index automatically if it doesn't exist so we force the creation - // preemptively. - createIndexIfNecessary(ActionListener.wrap(v -> client.delete(request, listener), listener::onFailure)); + client.delete(request, listener); } catch(Exception e) { listener.onFailure(e); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java index 99b538da7dd99..1428d457caed5 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java @@ -99,14 +99,15 @@ public void testEnsuredAuthenticatedUserIsSame() throws IOException { } public void testAutoCreateIndex() throws Exception { - { - PlainActionFuture future = PlainActionFuture.newFuture(); - indexService.createIndexIfNecessary(future); - future.get(); - assertSettings(); - } - AcknowledgedResponse ack = client().admin().indices().prepareDelete(index).get(); - assertTrue(ack.isAcknowledged()); + // TODO +// { +// PlainActionFuture future = PlainActionFuture.newFuture(); +// indexService.createIndexIfNecessary(future); +// future.get(); +// assertSettings(); +// } +// AcknowledgedResponse ack = client().admin().indices().prepareDelete(index).get(); +// assertTrue(ack.isAcknowledged()); AsyncExecutionId id = new AsyncExecutionId("0", new TaskId("N/A", 0)); AsyncSearchResponse resp = new AsyncSearchResponse(id.getEncoded(), true, true, 0L, 0L); @@ -116,7 +117,7 @@ public void testAutoCreateIndex() throws Exception { future.get(); assertSettings(); } - ack = client().admin().indices().prepareDelete(index).get(); + AcknowledgedResponse ack = client().admin().indices().prepareDelete(index).get(); assertTrue(ack.isAcknowledged()); { PlainActionFuture future = PlainActionFuture.newFuture(); From 4470df1ae684057f79825258aaf43d709961599e Mon Sep 17 00:00:00 2001 From: Rory Hunter Date: Thu, 10 Dec 2020 10:35:40 +0000 Subject: [PATCH 2/3] Fix tests --- .../xpack/async/AsyncResultsIndexPlugin.java | 20 +---- .../core/async/AsyncTaskIndexService.java | 80 ++++++++++++------- .../core/async/AsyncTaskServiceTests.java | 65 ++++++++++----- 3 files changed, 97 insertions(+), 68 deletions(-) diff --git a/x-pack/plugin/async/src/main/java/org/elasticsearch/xpack/async/AsyncResultsIndexPlugin.java b/x-pack/plugin/async/src/main/java/org/elasticsearch/xpack/async/AsyncResultsIndexPlugin.java index 13e716045c20d..66a5c860b2efe 100644 --- a/x-pack/plugin/async/src/main/java/org/elasticsearch/xpack/async/AsyncResultsIndexPlugin.java +++ b/x-pack/plugin/async/src/main/java/org/elasticsearch/xpack/async/AsyncResultsIndexPlugin.java @@ -36,7 +36,6 @@ import java.util.function.Supplier; import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; -import static org.elasticsearch.xpack.core.ClientHelper.LOGSTASH_MANAGEMENT_ORIGIN; public class AsyncResultsIndexPlugin extends Plugin implements SystemIndexPlugin { @@ -48,24 +47,7 @@ public AsyncResultsIndexPlugin(Settings settings) { @Override public Collection getSystemIndexDescriptors(Settings settings) { - final XContentBuilder mappings; - try { - mappings = AsyncTaskIndexService.mappings(); - } catch (IOException e) { - throw new UncheckedIOException("Failed to build " + XPackPlugin.ASYNC_RESULTS_INDEX + " index mappings", e); - } - - return List.of( - SystemIndexDescriptor.builder() - .setIndexPattern(XPackPlugin.ASYNC_RESULTS_INDEX) - .setDescription(this.getClass().getSimpleName()) - .setPrimaryIndex(XPackPlugin.ASYNC_RESULTS_INDEX) - .setMappings(mappings) - .setSettings(AsyncTaskIndexService.settings()) - .setVersionMetaKey("version") - .setOrigin(LOGSTASH_MANAGEMENT_ORIGIN) - .build() - ); + return List.of(AsyncTaskIndexService.getSystemIndexDescriptor()); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java index b58087408e66b..bc3da1e3015a6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java @@ -31,8 +31,10 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.indices.SystemIndexDescriptor; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.xpack.core.XPackPlugin; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse; import org.elasticsearch.xpack.core.security.SecurityContext; @@ -40,6 +42,7 @@ import org.elasticsearch.xpack.core.security.authc.support.AuthenticationContextSerializer; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.util.Base64; import java.util.Collections; @@ -50,6 +53,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; +import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN; import static org.elasticsearch.xpack.core.security.authc.AuthenticationField.AUTHENTICATION_KEY; /** @@ -62,11 +66,13 @@ public final class AsyncTaskIndexService> { public static final String EXPIRATION_TIME_FIELD = "expiration_time"; public static final String RESULT_FIELD = "result"; - // Usually the settings and mappings below would be co-located with the SystemIndexPlugin implementation, - // however in this case this service is in a different project to AsyncResultsIndexPlugin, as are tests - // that need access to #settings(). + // Usually the settings, mappings and system index descriptor below + // would be co-located with the SystemIndexPlugin implementation, + // however in this case this service is in a different project to + // AsyncResultsIndexPlugin, as are tests that need access to + // #settings(). - public static Settings settings() { + static Settings settings() { return Settings.builder() .put("index.codec", "best_compression") .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) @@ -75,34 +81,50 @@ public static Settings settings() { .build(); } - public static XContentBuilder mappings() throws IOException { - XContentBuilder builder = jsonBuilder() - .startObject() - .startObject(SINGLE_MAPPING_NAME) - .startObject("_meta") - .field("version", Version.CURRENT) - .endObject() - .field("dynamic", "strict") - .startObject("properties") - .startObject(HEADERS_FIELD) - .field("type", "object") - .field("enabled", "false") - .endObject() - .startObject(RESPONSE_HEADERS_FIELD) - .field("type", "object") - .field("enabled", "false") - .endObject() - .startObject(RESULT_FIELD) - .field("type", "object") - .field("enabled", "false") + private static XContentBuilder mappings() { + try { + XContentBuilder builder = jsonBuilder() + .startObject() + .startObject(SINGLE_MAPPING_NAME) + .startObject("_meta") + .field("version", Version.CURRENT) .endObject() - .startObject(EXPIRATION_TIME_FIELD) - .field("type", "long") + .field("dynamic", "strict") + .startObject("properties") + .startObject(HEADERS_FIELD) + .field("type", "object") + .field("enabled", "false") + .endObject() + .startObject(RESPONSE_HEADERS_FIELD) + .field("type", "object") + .field("enabled", "false") + .endObject() + .startObject(RESULT_FIELD) + .field("type", "object") + .field("enabled", "false") + .endObject() + .startObject(EXPIRATION_TIME_FIELD) + .field("type", "long") + .endObject() .endObject() .endObject() - .endObject() - .endObject(); - return builder; + .endObject(); + return builder; + } catch (IOException e) { + throw new UncheckedIOException("Failed to build mappings for " + XPackPlugin.ASYNC_RESULTS_INDEX, e); + } + } + + public static SystemIndexDescriptor getSystemIndexDescriptor() { + return SystemIndexDescriptor.builder() + .setIndexPattern(XPackPlugin.ASYNC_RESULTS_INDEX) + .setDescription("Async search results") + .setPrimaryIndex(XPackPlugin.ASYNC_RESULTS_INDEX) + .setMappings(mappings()) + .setSettings(settings()) + .setVersionMetaKey("version") + .setOrigin(ASYNC_SEARCH_ORIGIN) + .build(); } private final String index; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java index 1428d457caed5..66fa5da060072 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncTaskServiceTests.java @@ -15,6 +15,9 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.indices.SystemIndexDescriptor; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.SystemIndexPlugin; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.transport.TransportService; @@ -24,7 +27,10 @@ import org.junit.Before; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.List; // TODO: test CRUD operations public class AsyncTaskServiceTests extends ESSingleNodeTestCase { @@ -41,6 +47,23 @@ public void setup() { client(), "test_origin", AsyncSearchResponse::new, writableRegistry()); } + @Override + protected Collection> getPlugins() { + List> plugins = new ArrayList<>(super.getPlugins()); + plugins.add(TestPlugin.class); + return plugins; + } + + /** + * This class exists because AsyncResultsIndexPlugin exists in a different x-pack module. + */ + public static class TestPlugin extends Plugin implements SystemIndexPlugin { + @Override + public Collection getSystemIndexDescriptors(Settings settings) { + return List.of(AsyncTaskIndexService.getSystemIndexDescriptor()); + } + } + public void testEnsuredAuthenticatedUserIsSame() throws IOException { Authentication original = new Authentication(new User("test", "role"), new Authentication.RealmRef("realm", "file", "node"), null); @@ -99,16 +122,7 @@ public void testEnsuredAuthenticatedUserIsSame() throws IOException { } public void testAutoCreateIndex() throws Exception { - // TODO -// { -// PlainActionFuture future = PlainActionFuture.newFuture(); -// indexService.createIndexIfNecessary(future); -// future.get(); -// assertSettings(); -// } -// AcknowledgedResponse ack = client().admin().indices().prepareDelete(index).get(); -// assertTrue(ack.isAcknowledged()); - + // To begin with, the results index should be auto-created. AsyncExecutionId id = new AsyncExecutionId("0", new TaskId("N/A", 0)); AsyncSearchResponse resp = new AsyncSearchResponse(id.getEncoded(), true, true, 0L, 0L); { @@ -117,37 +131,48 @@ public void testAutoCreateIndex() throws Exception { future.get(); assertSettings(); } + + // Delete the index, so we can test subsequent auto-create behaviour AcknowledgedResponse ack = client().admin().indices().prepareDelete(index).get(); assertTrue(ack.isAcknowledged()); + + // Subsequent response deletes throw a (wrapped) index not found exception { PlainActionFuture future = PlainActionFuture.newFuture(); indexService.deleteResponse(id, future); - future.get(); - assertSettings(); + expectThrows(Exception.class, future::get); } - ack = client().admin().indices().prepareDelete(index).get(); - assertTrue(ack.isAcknowledged()); + + // So do updates { PlainActionFuture future = PlainActionFuture.newFuture(); indexService.updateResponse(id.getDocId(), Collections.emptyMap(), resp, future); - expectThrows(Exception.class, () -> future.get()); + expectThrows(Exception.class, future::get); assertSettings(); } - ack = client().admin().indices().prepareDelete(index).get(); - assertTrue(ack.isAcknowledged()); + + // And so does updating the expiration time { PlainActionFuture future = PlainActionFuture.newFuture(); indexService.updateExpirationTime("0", 10L, future); - expectThrows(Exception.class, () -> future.get()); + expectThrows(Exception.class, future::get); + assertSettings(); + } + + // But the index is still auto-created + { + PlainActionFuture future = PlainActionFuture.newFuture(); + indexService.createResponse(id.getDocId(), Collections.emptyMap(), resp, future); + future.get(); assertSettings(); } } - private void assertSettings() throws IOException { + private void assertSettings() { GetIndexResponse getIndexResponse = client().admin().indices().getIndex( new GetIndexRequest().indices(index)).actionGet(); Settings settings = getIndexResponse.getSettings().get(index); Settings expected = AsyncTaskIndexService.settings(); - assertEquals(expected, settings.filter(key -> expected.hasValue(key))); + assertEquals(expected, settings.filter(expected::hasValue)); } } From 3982d5954f69d2d99f93804dd48211ebe2c10ecb Mon Sep 17 00:00:00 2001 From: Rory Hunter Date: Thu, 10 Dec 2020 15:09:35 +0000 Subject: [PATCH 3/3] Imports --- .../org/elasticsearch/xpack/async/AsyncResultsIndexPlugin.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/x-pack/plugin/async/src/main/java/org/elasticsearch/xpack/async/AsyncResultsIndexPlugin.java b/x-pack/plugin/async/src/main/java/org/elasticsearch/xpack/async/AsyncResultsIndexPlugin.java index 66a5c860b2efe..4bafef96556ef 100644 --- a/x-pack/plugin/async/src/main/java/org/elasticsearch/xpack/async/AsyncResultsIndexPlugin.java +++ b/x-pack/plugin/async/src/main/java/org/elasticsearch/xpack/async/AsyncResultsIndexPlugin.java @@ -13,7 +13,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.indices.SystemIndexDescriptor; @@ -28,8 +27,6 @@ import org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService; import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; -import java.io.IOException; -import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collection; import java.util.List;