Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate async search to use an auto-created system index #66154

Merged
merged 3 commits into from
Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.indices.SystemIndexDescriptor;
Expand All @@ -27,9 +28,10 @@
import org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;

Expand All @@ -45,7 +47,7 @@ public AsyncResultsIndexPlugin(Settings settings) {

@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
return Collections.singletonList(new SystemIndexDescriptor(XPackPlugin.ASYNC_RESULTS_INDEX, this.getClass().getSimpleName()));
return List.of(AsyncTaskIndexService.getSystemIndexDescriptor());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@
*/
package org.elasticsearch.xpack.core.async;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
Expand All @@ -35,15 +31,18 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;
import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.core.security.authc.Authentication;
import org.elasticsearch.xpack.core.security.authc.support.AuthenticationContextSerializer;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Collections;
Expand All @@ -54,19 +53,25 @@

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
import static org.elasticsearch.xpack.core.security.authc.AuthenticationField.AUTHENTICATION_KEY;

/**
* A service that exposes the CRUD operations for the async task-specific index.
*/
public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
private static final Logger logger = LogManager.getLogger(AsyncTaskIndexService.class);

public static final String HEADERS_FIELD = "headers";
public static final String RESPONSE_HEADERS_FIELD = "response_headers";
public static final String EXPIRATION_TIME_FIELD = "expiration_time";
public static final String RESULT_FIELD = "result";

// Usually the settings, mappings and system index descriptor below
// would be co-located with the SystemIndexPlugin implementation,
// however in this case this service is in a different project to
// AsyncResultsIndexPlugin, as are tests that need access to
// #settings().

static Settings settings() {
return Settings.builder()
.put("index.codec", "best_compression")
Expand All @@ -76,44 +81,58 @@ static Settings settings() {
.build();
}

static XContentBuilder mappings() throws IOException {
XContentBuilder builder = jsonBuilder()
.startObject()
.startObject(SINGLE_MAPPING_NAME)
.startObject("_meta")
.field("version", Version.CURRENT)
.endObject()
.field("dynamic", "strict")
.startObject("properties")
.startObject(HEADERS_FIELD)
.field("type", "object")
.field("enabled", "false")
.endObject()
.startObject(RESPONSE_HEADERS_FIELD)
.field("type", "object")
.field("enabled", "false")
.endObject()
.startObject(RESULT_FIELD)
.field("type", "object")
.field("enabled", "false")
private static XContentBuilder mappings() {
try {
XContentBuilder builder = jsonBuilder()
.startObject()
.startObject(SINGLE_MAPPING_NAME)
.startObject("_meta")
.field("version", Version.CURRENT)
.endObject()
.startObject(EXPIRATION_TIME_FIELD)
.field("type", "long")
.field("dynamic", "strict")
.startObject("properties")
.startObject(HEADERS_FIELD)
.field("type", "object")
.field("enabled", "false")
.endObject()
.startObject(RESPONSE_HEADERS_FIELD)
.field("type", "object")
.field("enabled", "false")
.endObject()
.startObject(RESULT_FIELD)
.field("type", "object")
.field("enabled", "false")
.endObject()
.startObject(EXPIRATION_TIME_FIELD)
.field("type", "long")
.endObject()
.endObject()
.endObject()
.endObject()
.endObject();
return builder;
.endObject();
return builder;
} catch (IOException e) {
throw new UncheckedIOException("Failed to build mappings for " + XPackPlugin.ASYNC_RESULTS_INDEX, e);
}
}

public static SystemIndexDescriptor getSystemIndexDescriptor() {
return SystemIndexDescriptor.builder()
.setIndexPattern(XPackPlugin.ASYNC_RESULTS_INDEX)
.setDescription("Async search results")
.setPrimaryIndex(XPackPlugin.ASYNC_RESULTS_INDEX)
.setMappings(mappings())
.setSettings(settings())
.setVersionMetaKey("version")
.setOrigin(ASYNC_SEARCH_ORIGIN)
.build();
}

private final String index;
private final ClusterService clusterService;
private final Client client;
private final SecurityContext securityContext;
private final NamedWriteableRegistry registry;
private final Writeable.Reader<R> reader;


public AsyncTaskIndexService(String index,
ClusterService clusterService,
ThreadContext threadContext,
Expand All @@ -122,7 +141,6 @@ public AsyncTaskIndexService(String index,
Writeable.Reader<R> reader,
NamedWriteableRegistry registry) {
this.index = index;
this.clusterService = clusterService;
this.securityContext = new SecurityContext(clusterService.getSettings(), threadContext);
this.client = new OriginSettingClient(client, origin);
this.registry = registry;
Expand All @@ -136,34 +154,6 @@ public Client getClient() {
return client;
}

/**
* Creates the index with the expected settings and mappings if it doesn't exist.
*/
void createIndexIfNecessary(ActionListener<Void> listener) {
if (clusterService.state().routingTable().hasIndex(index) == false) {
try {
client.admin().indices().prepareCreate(index)
.setSettings(settings())
.setMapping(mappings())
.execute(ActionListener.wrap(
resp -> listener.onResponse(null),
exc -> {
if (ExceptionsHelper.unwrapCause(exc) instanceof ResourceAlreadyExistsException) {
listener.onResponse(null);
} else {
logger.error("failed to create " + index + " index", exc);
listener.onFailure(exc);
}
}));
} catch (Exception exc) {
logger.error("failed to create " + index + " index", exc);
listener.onFailure(exc);
}
} else {
listener.onResponse(null);
}
}

/**
* Stores the initial response with the original headers of the authenticated user
* and the expected expiration time.
Expand All @@ -180,7 +170,7 @@ public void createResponse(String docId,
.create(true)
.id(docId)
.source(source, XContentType.JSON);
createIndexIfNecessary(ActionListener.wrap(v -> client.index(indexRequest, listener), listener::onFailure));
client.index(indexRequest, listener);
}

/**
Expand All @@ -199,9 +189,7 @@ public void updateResponse(String docId,
.id(docId)
.doc(source, XContentType.JSON)
.retryOnConflict(5);
// updates create the index automatically if it doesn't exist so we force the creation
// preemptively.
createIndexIfNecessary(ActionListener.wrap(v -> client.update(request, listener), listener::onFailure));
client.update(request, listener);
} catch(Exception e) {
listener.onFailure(e);
}
Expand All @@ -219,9 +207,7 @@ public void updateExpirationTime(String docId,
.id(docId)
.doc(source, XContentType.JSON)
.retryOnConflict(5);
// updates create the index automatically if it doesn't exist so we force the creation
// preemptively.
createIndexIfNecessary(ActionListener.wrap(v -> client.update(request, listener), listener::onFailure));
client.update(request, listener);
}

/**
Expand All @@ -231,9 +217,7 @@ public void deleteResponse(AsyncExecutionId asyncExecutionId,
ActionListener<DeleteResponse> listener) {
try {
DeleteRequest request = new DeleteRequest(index).id(asyncExecutionId.getDocId());
// deletes create the index automatically if it doesn't exist so we force the creation
// preemptively.
createIndexIfNecessary(ActionListener.wrap(v -> client.delete(request, listener), listener::onFailure));
client.delete(request, listener);
} catch(Exception e) {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SystemIndexPlugin;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.transport.TransportService;
Expand All @@ -24,7 +27,10 @@
import org.junit.Before;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

// TODO: test CRUD operations
public class AsyncTaskServiceTests extends ESSingleNodeTestCase {
Expand All @@ -41,6 +47,23 @@ public void setup() {
client(), "test_origin", AsyncSearchResponse::new, writableRegistry());
}

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.getPlugins());
plugins.add(TestPlugin.class);
return plugins;
}

/**
* This class exists because AsyncResultsIndexPlugin exists in a different x-pack module.
*/
public static class TestPlugin extends Plugin implements SystemIndexPlugin {
@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
return List.of(AsyncTaskIndexService.getSystemIndexDescriptor());
}
}

public void testEnsuredAuthenticatedUserIsSame() throws IOException {
Authentication original =
new Authentication(new User("test", "role"), new Authentication.RealmRef("realm", "file", "node"), null);
Expand Down Expand Up @@ -99,15 +122,7 @@ public void testEnsuredAuthenticatedUserIsSame() throws IOException {
}

public void testAutoCreateIndex() throws Exception {
{
PlainActionFuture<Void> future = PlainActionFuture.newFuture();
indexService.createIndexIfNecessary(future);
future.get();
assertSettings();
}
AcknowledgedResponse ack = client().admin().indices().prepareDelete(index).get();
assertTrue(ack.isAcknowledged());

// To begin with, the results index should be auto-created.
AsyncExecutionId id = new AsyncExecutionId("0", new TaskId("N/A", 0));
AsyncSearchResponse resp = new AsyncSearchResponse(id.getEncoded(), true, true, 0L, 0L);
{
Expand All @@ -116,37 +131,48 @@ public void testAutoCreateIndex() throws Exception {
future.get();
assertSettings();
}
ack = client().admin().indices().prepareDelete(index).get();

// Delete the index, so we can test subsequent auto-create behaviour
AcknowledgedResponse ack = client().admin().indices().prepareDelete(index).get();
assertTrue(ack.isAcknowledged());

// Subsequent response deletes throw a (wrapped) index not found exception
{
PlainActionFuture<DeleteResponse> future = PlainActionFuture.newFuture();
indexService.deleteResponse(id, future);
future.get();
assertSettings();
expectThrows(Exception.class, future::get);
}
ack = client().admin().indices().prepareDelete(index).get();
assertTrue(ack.isAcknowledged());

// So do updates
{
PlainActionFuture<UpdateResponse> future = PlainActionFuture.newFuture();
indexService.updateResponse(id.getDocId(), Collections.emptyMap(), resp, future);
expectThrows(Exception.class, () -> future.get());
expectThrows(Exception.class, future::get);
assertSettings();
}
ack = client().admin().indices().prepareDelete(index).get();
assertTrue(ack.isAcknowledged());

// And so does updating the expiration time
{
PlainActionFuture<UpdateResponse> future = PlainActionFuture.newFuture();
indexService.updateExpirationTime("0", 10L, future);
expectThrows(Exception.class, () -> future.get());
expectThrows(Exception.class, future::get);
assertSettings();
}

// But the index is still auto-created
{
PlainActionFuture<IndexResponse> future = PlainActionFuture.newFuture();
indexService.createResponse(id.getDocId(), Collections.emptyMap(), resp, future);
future.get();
assertSettings();
}
}

private void assertSettings() throws IOException {
private void assertSettings() {
GetIndexResponse getIndexResponse = client().admin().indices().getIndex(
new GetIndexRequest().indices(index)).actionGet();
Settings settings = getIndexResponse.getSettings().get(index);
Settings expected = AsyncTaskIndexService.settings();
assertEquals(expected, settings.filter(key -> expected.hasValue(key)));
assertEquals(expected, settings.filter(expected::hasValue));
}
}