Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reindex resolve indices early #49850

Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ private static <T> List<T> parseArray(XContentParser parser, IOSupplier<T> suppl
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
if (parser.currentToken().isValue()
|| parser.currentToken() == XContentParser.Token.VALUE_NULL
|| parser.currentToken() == XContentParser.Token.START_ARRAY
|| parser.currentToken() == XContentParser.Token.START_OBJECT) {
list.add(supplier.get());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,30 +585,39 @@ class TestStruct {
public void testArraysOfGenericValues() throws IOException {
XContentParser parser = createParser(
JsonXContent.jsonXContent,
"{ \"test_array\": [ 1, null, \"3\", 4.2], \"int_array\": [ 1, 2, 3] }"
"{ \"test_array\": [ 1, null, \"3\", 4.2], \"int_array\": [ 1, 2, 3], \"multi_array\": [ [1, 2], [3, 4] ] }"
);
class TestStruct {
List<Object> testArray = new ArrayList<>();

List<Integer> ints = new ArrayList<>();

List<List<Object> > multis = new ArrayList<>();

public void setInts(List<Integer> ints) {
this.ints = ints;
}

public void setArray(List<Object> testArray) {
this.testArray = testArray;
}

public void setMultis(List<List<Object>> multis) {
this.multis = multis;
}
}
ObjectParser<TestStruct, Void> objectParser = new ObjectParser<>("foo");
TestStruct s = new TestStruct();

objectParser.declareFieldArray(TestStruct::setArray, (p, c) -> XContentParserUtils.parseFieldsValue(p),
new ParseField("test_array"), ValueType.VALUE_ARRAY);
objectParser.declareIntArray(TestStruct::setInts, new ParseField("int_array"));
objectParser.declareFieldArray(TestStruct::setMultis, (p, c) -> p.list(),
new ParseField("multi_array"), ValueType.OBJECT_ARRAY);
objectParser.parse(parser, s, null);
assertEquals(s.testArray, Arrays.asList(1, null, "3", 4.2));
assertEquals(s.ints, Arrays.asList(1, 2, 3));
assertEquals(s.multis, Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4)));

parser = createParser(JsonXContent.jsonXContent, "{\"test_array\": 42}");
s = new TestStruct();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.elasticsearch.persistent.PersistentTaskParams;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;

public class ReindexTaskParams implements PersistentTaskParams {
Expand All @@ -37,26 +39,32 @@ public class ReindexTaskParams implements PersistentTaskParams {

@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<ReindexTaskParams, Void> PARSER
= new ConstructingObjectParser<>(NAME, a -> new ReindexTaskParams((Boolean) a[0], (Map<String, String>) a[1]));
= new ConstructingObjectParser<>(NAME, a -> new ReindexTaskParams((Boolean) a[0], (List<List<String>>) a[1],
(Map<String, String>) a[2]));

private static String STORE_RESULT = "store_result";
private static String HEADERS = "headers";
private static String INDEX_GROUPS = "index_groups";

static {
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), new ParseField(STORE_RESULT));
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), (p, c) -> p.list(), new ParseField(INDEX_GROUPS));
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), new ParseField(HEADERS));
}

private final boolean storeResult;
private final List<? extends Collection<String>> indexGroups;
private final Map<String, String> headers;

public ReindexTaskParams(boolean storeResult, Map<String, String> headers) {
public ReindexTaskParams(boolean storeResult, List<? extends Collection<String>> indexGroups, Map<String, String> headers) {
this.storeResult = storeResult;
this.indexGroups = indexGroups;
this.headers = headers;
}

public ReindexTaskParams(StreamInput in) throws IOException {
storeResult = in.readBoolean();
indexGroups = in.readList(StreamInput::readStringList);
headers = in.readMap(StreamInput::readString, StreamInput::readString);
}

Expand All @@ -74,13 +82,15 @@ public Version getMinimalSupportedVersion() {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(storeResult);
out.writeCollection(indexGroups, StreamOutput::writeStringCollection);
out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(STORE_RESULT, storeResult);
builder.field(INDEX_GROUPS, indexGroups);
builder.field(HEADERS, headers);
return builder.endObject();
}
Expand All @@ -89,6 +99,10 @@ public boolean shouldStoreResult() {
return storeResult;
}

public List<? extends Collection<String>> getIndexGroups() {
return indexGroups;
}

public Map<String, String> getHeaders() {
return headers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
Expand All @@ -41,17 +45,27 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.SortedMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class TransportStartReindexTaskAction
extends HandledTransportAction<StartReindexTaskAction.Request, StartReindexTaskAction.Response> {

private final List<String> headersToInclude;
private final ThreadPool threadPool;
private final PersistentTasksService persistentTasksService;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final ClusterService clusterService;
private final ReindexValidator reindexValidator;
private final ReindexIndexClient reindexIndexClient;

Expand All @@ -63,6 +77,8 @@ public TransportStartReindexTaskAction(Settings settings, Client client, Transpo
super(StartReindexTaskAction.NAME, transportService, actionFilters, StartReindexTaskAction.Request::new);
this.headersToInclude = ReindexHeaders.REINDEX_INCLUDED_HEADERS.get(settings);
this.threadPool = threadPool;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.clusterService = clusterService;
this.reindexValidator = new ReindexValidator(settings, clusterService, indexNameExpressionResolver, autoCreateIndex);
this.persistentTasksService = persistentTasksService;
this.reindexIndexClient = new ReindexIndexClient(client, clusterService, xContentRegistry);
Expand All @@ -87,7 +103,7 @@ protected void doExecute(Task task, StartReindexTaskAction.Request request, Acti

// In the current implementation, we only need to store task results if we do not wait for completion
boolean storeTaskResult = request.getWaitForCompletion() == false;
ReindexTaskParams job = new ReindexTaskParams(storeTaskResult, included);
ReindexTaskParams job = new ReindexTaskParams(storeTaskResult, resolveIndexPatterns(request.getReindexRequest()), included);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to be putting this in the cluster state? I don't have a sense for how large this gets, but I assume it could go in the index?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this is clearly wrong, fixed in 33d8fd4.


ReindexTaskStateDoc reindexState = new ReindexTaskStateDoc(request.getReindexRequest());
reindexIndexClient.createReindexTaskDoc(generatedId, reindexState, new ActionListener<>() {
Expand Down Expand Up @@ -212,4 +228,164 @@ private boolean isDone(ReindexPersistentTaskState state) {
return state != null && state.isDone();
}
}

/**
* Resolve index patterns to ensure they do not start resolving differently during reindex failovers.
* Do not resolve aliases, since accessing the underlying indices is not semantically equivalent to accessing the alias.
* Within each index pattern, sort the resolved indices by create date, since this ensures that if we reindex from a pattern of indices,
* destination will receive oldest data first. This is in particular important if destination does rollover and it is time-based data.
*
* @return list of groups of indices/aliases that must be searched together.
*/
private List<Set<String>> resolveIndexPatterns(ReindexRequest request) {
return resolveIndexPatterns(request, clusterService.state(), indexNameExpressionResolver);
}

// visible for testing
static List<Set<String>> resolveIndexPatterns(ReindexRequest request, ClusterState clusterState,
IndexNameExpressionResolver indexNameResolver) {
if (request.getRemoteInfo() == null) {
return resolveIndexPatterns(request.getSearchRequest().indices(), clusterState, indexNameResolver);
} else {
return Collections.emptyList();
}
}

private static List<Set<String>> resolveIndexPatterns(String[] indices, ClusterState clusterState,
IndexNameExpressionResolver indexNameResolver) {
Set<String> resolvedNames = indexNameResolver.resolveExpressions(clusterState, indices);

List<IndexGroup> groups = Arrays.stream(indices)
.flatMap(expression -> resolveSingleIndexExpression(expression, resolvedNames::contains,clusterState, indexNameResolver))
.collect(Collectors.toList());

return resolveGroups(groups).stream().map(IndexGroup::newResolvedGroup).collect(Collectors.toList());
}

private static List<IndexGroup> resolveGroups(List<IndexGroup> groups) {
List<IndexGroup> result = new ArrayList<>(groups);

// n^2, but OK since data volume is low.
// reverse order since we bubble data towards the lower index end.
for (int i = result.size() - 1; i >= 0; --i) {
IndexGroup current = result.get(i);
for (int j = i - 1; current != null && j >= 0; --j) {
IndexGroup earlier = result.get(j);
Tuple<IndexGroup, IndexGroup> collapsed = earlier.collapse(current);
result.set(j, collapsed.v1());
current = collapsed.v2();
}
result.set(i, current);
}

return result.stream().filter(Objects::nonNull).collect(Collectors.toList());
}

private static Stream<IndexGroup> resolveSingleIndexExpression(String expression, Predicate<String> predicate,
ClusterState clusterState,
IndexNameExpressionResolver indexNameExpressionResolver) {
SortedMap<String, AliasOrIndex> lookup = clusterState.getMetaData().getAliasAndIndexLookup();
Comparator<AliasOrIndex> createDateIndexOrder = (i1, i2) -> {
if (i1.isAlias() && i2.isAlias()) {
return ((AliasOrIndex.Alias) i1).getAliasName().compareTo(((AliasOrIndex.Alias) i2).getAliasName());
}
if (i1.isAlias() != i2.isAlias()) {
return Boolean.compare(i1.isAlias(), i2.isAlias());
}

assert i1.getIndices().size() == 1;
assert i2.getIndices().size() == 1;
IndexMetaData indexMetaData1 = i1.getIndices().get(0);
IndexMetaData indexMetaData2 = i2.getIndices().get(0);
int compare = Long.compare(indexMetaData1.getCreationDate(), indexMetaData2.getCreationDate());
return compare != 0 ? compare : indexMetaData1.getIndex().getName().compareTo(indexMetaData2.getIndex().getName());
};

return indexNameExpressionResolver.resolveExpressions(clusterState, expression).stream()
.filter(predicate).map(lookup::get).sorted(createDateIndexOrder).map(IndexGroup::create);
}

/**
* Immutable group of indices and aliases.
*/
private static class IndexGroup {
private final Set<String> indices;
private final Set<String> allIndices;
private final Set<AliasOrIndex.Alias> aliases;
private final List<String> orderedIndices;

private IndexGroup(List<String> indices, Set<AliasOrIndex.Alias> aliases) {
orderedIndices = indices;
this.indices = indices.stream().collect(Collectors.toUnmodifiableSet());
this.aliases = aliases;
this.allIndices = Stream.concat(indices.stream(),
aliases.stream().flatMap(aliasOrIndex -> aliasOrIndex.getIndices().stream())
.map(imd -> imd.getIndex().getName())
).collect(Collectors.toUnmodifiableSet());
}

private IndexGroup(IndexGroup group1, IndexGroup group2) {
this(Stream.concat(group1.orderedIndices.stream(), group2.orderedIndices.stream()).collect(Collectors.toList()),
Stream.concat(group1.aliases.stream(), group2.aliases.stream())
.collect(Collectors.toSet()));
}

public static IndexGroup create(AliasOrIndex aliasOrIndex) {
if (aliasOrIndex.isAlias()) {
return new IndexGroup(Collections.emptyList(), Collections.singleton((AliasOrIndex.Alias) aliasOrIndex));
} else {
return new IndexGroup(Collections.singletonList(aliasOrIndex.getIndices().get(0).getIndex().getName()),
Collections.emptySet());
}
}

private Tuple<IndexGroup, IndexGroup> collapse(IndexGroup other) {
if (other == this) {
return Tuple.tuple(this, this);
}

if (aliasOverlap(this.aliases, other.allIndices) || aliasOverlap(other.aliases, this.allIndices)) {
return Tuple.tuple(new IndexGroup(this, other), null);
}

Set<String> intersection = Sets.intersection(indices, other.indices);
assert intersection.isEmpty() == false || Sets.intersection(allIndices, other.allIndices).isEmpty();
assert intersection.isEmpty() || Sets.intersection(allIndices, other.allIndices).isEmpty() == false;
return Tuple.tuple(this.add(intersection, orderedIndices), other.subtract(intersection));
}

private IndexGroup add(Set<String> intersection, List<String> order) {
if (intersection.isEmpty()) {
return this;
}

List<String> indices =
Stream.concat(this.orderedIndices.stream(), order.stream().filter(intersection::contains)).collect(Collectors.toList());
return new IndexGroup(indices, aliases);
}

private IndexGroup subtract(Set<String> intersection) {
if (intersection.isEmpty()) {
return this;
}

List<String> indices =
this.orderedIndices.stream().filter(Predicate.not(intersection::contains)).collect(Collectors.toList());

if (indices.isEmpty()) {
return null;
}
return new IndexGroup(indices, aliases);
}

private static boolean aliasOverlap(Set<AliasOrIndex.Alias> aliases, Set<String> indices) {
return aliases.stream()
.flatMap(aliasOrIndex -> aliasOrIndex.getIndices().stream()).map(imd -> imd.getIndex().getName())
.anyMatch(indices::contains);
}

public Set<String> newResolvedGroup() {
return Stream.concat(indices.stream(), aliases.stream().map(AliasOrIndex.Alias::getAliasName)).collect(Collectors.toSet());
}
}
}
Loading