From fd44585a48a7e4ad3834d40aa87c0f1e47a92c38 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 15 Nov 2017 16:00:38 +0100 Subject: [PATCH 1/3] Fix `ShardSplittingQuery` to respect nested documents. Today if nested docs are used in an index that is split the operation will only work correctly if the index is not routing partitioned or unless routing is used. This change fixes the query that selectes the docs to delete to also select all parents nested docs as well. Closes #27378 --- .../index/shard/ShardSplittingQuery.java | 120 ++++++++++-- .../index/shard/StoreRecovery.java | 10 +- .../admin/indices/create/SplitIndexIT.java | 69 +++++-- .../index/shard/ShardSplittingQueryTests.java | 178 ++++++++++++++---- .../index/shard/StoreRecoveryTests.java | 4 +- 5 files changed, 313 insertions(+), 68 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShardSplittingQuery.java b/core/src/main/java/org/elasticsearch/index/shard/ShardSplittingQuery.java index 94aee085175a0..e6892af517b02 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShardSplittingQuery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShardSplittingQuery.java @@ -19,9 +19,11 @@ package org.elasticsearch.index.shard; import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.IndexReaderContext; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.PostingsEnum; +import org.apache.lucene.index.ReaderUtil; import org.apache.lucene.index.StoredFieldVisitor; import org.apache.lucene.index.Terms; import org.apache.lucene.index.TermsEnum; @@ -33,19 +35,22 @@ import org.apache.lucene.search.Scorer; import org.apache.lucene.search.TwoPhaseIterator; import org.apache.lucene.search.Weight; +import org.apache.lucene.search.join.BitSetProducer; +import org.apache.lucene.util.BitSet; import org.apache.lucene.util.BitSetIterator; -import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.FixedBitSet; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.OperationRouting; +import org.elasticsearch.common.lucene.search.Queries; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.RoutingFieldMapper; import org.elasticsearch.index.mapper.Uid; import java.io.IOException; import java.util.function.IntConsumer; +import java.util.function.IntPredicate; import java.util.function.Predicate; /** @@ -56,16 +61,21 @@ final class ShardSplittingQuery extends Query { private final IndexMetaData indexMetaData; private final int shardId; + private final BitSetProducer nestedParentBitSetProducer; - ShardSplittingQuery(IndexMetaData indexMetaData, int shardId) { + ShardSplittingQuery(IndexMetaData indexMetaData, int shardId, boolean hasNested) { + this(indexMetaData, shardId, hasNested ? newParentDocBitSetProducer() : null); + } + + private ShardSplittingQuery(IndexMetaData indexMetaData, int shardId, BitSetProducer nestedParentBitSetProducer) { if (indexMetaData.getCreationVersion().before(Version.V_6_0_0_rc2)) { throw new IllegalArgumentException("Splitting query can only be executed on an index created with version " + Version.V_6_0_0_rc2 + " or higher"); } this.indexMetaData = indexMetaData; this.shardId = shardId; + this.nestedParentBitSetProducer = nestedParentBitSetProducer; } - @Override public Weight createWeight(IndexSearcher searcher, boolean needsScores, float boost) { return new ConstantScoreWeight(this, boost) { @@ -84,44 +94,85 @@ public Scorer scorer(LeafReaderContext context) throws IOException { Uid.decodeId(ref.bytes, ref.offset, ref.length), null); return shardId == targetShardId; }; - if (terms == null) { // this is the common case - no partitioning and no _routing values + if (terms == null) { + // this is the common case - no partitioning and no _routing values + // in this case we also don't do anything special with regards to nested docs since we basically delete + // by ID and parent and nested all have the same id. assert indexMetaData.isRoutingPartitionedIndex() == false; findSplitDocs(IdFieldMapper.NAME, includeInShard, leafReader, bitSet::set); } else { + final BitSet parentBitSet; + final IntPredicate includeDoc; + if (nestedParentBitSetProducer == null) { + parentBitSet = null; + includeDoc = i -> true; + } else { + parentBitSet = nestedParentBitSetProducer.getBitSet(context); + if (parentBitSet == null) { + return null; // no matches + } + includeDoc = parentBitSet::get; + } if (indexMetaData.isRoutingPartitionedIndex()) { // this is the heaviest invariant. Here we have to visit all docs stored fields do extract _id and _routing // this this index is routing partitioned. Visitor visitor = new Visitor(); - return new ConstantScoreScorer(this, score(), - new RoutingPartitionedDocIdSetIterator(leafReader, visitor)); + TwoPhaseIterator twoPhaseIterator = + parentBitSet == null ? new RoutingPartitionedDocIdSetIterator(leafReader, visitor) : + new NestedRoutingPartitionedDocIdSetIterator(leafReader, visitor, parentBitSet); + return new ConstantScoreScorer(this, score(), twoPhaseIterator); } else { // in the _routing case we first go and find all docs that have a routing value and mark the ones we have to delete findSplitDocs(RoutingFieldMapper.NAME, ref -> { int targetShardId = OperationRouting.generateShardId(indexMetaData, null, ref.utf8ToString()); return shardId == targetShardId; - }, leafReader, bitSet::set); + }, leafReader, docId -> { + if (includeDoc.test(docId)) { + bitSet.set(docId); + } + }); + // now if we have a mixed index where some docs have a _routing value and some don't we have to exclude the ones // with a routing value from the next iteration an delete / select based on the ID. if (terms.getDocCount() != leafReader.maxDoc()) { // this is a special case where some of the docs have no routing values this sucks but it's possible today FixedBitSet hasRoutingValue = new FixedBitSet(leafReader.maxDoc()); findSplitDocs(RoutingFieldMapper.NAME, ref -> false, leafReader, - hasRoutingValue::set); + docId -> { + if (includeDoc.test(docId)) { + hasRoutingValue.set(docId); + } + }); + findSplitDocs(IdFieldMapper.NAME, includeInShard, leafReader, docId -> { - if (hasRoutingValue.get(docId) == false) { + if (hasRoutingValue.get(docId) == false && includeDoc.test(docId)) { bitSet.set(docId); } }); } } + if (parentBitSet != null) { + markChildDocs(parentBitSet, bitSet); + } } + return new ConstantScoreScorer(this, score(), new BitSetIterator(bitSet, bitSet.length())); } - - }; } + private void markChildDocs(BitSet parentDocs, BitSet deletedDocs) { + int currentDeleted = 0; + while (currentDeleted < deletedDocs.length() && + (currentDeleted = deletedDocs.nextSetBit(currentDeleted)) != DocIdSetIterator.NO_MORE_DOCS) { + int previousParent = parentDocs.prevSetBit(Math.max(0, currentDeleted-1)); + for (int i = previousParent + 1; i < currentDeleted; i++) { + deletedDocs.set(i); + } + currentDeleted++; + } + } + @Override public String toString(String field) { return "shard_splitting_query"; @@ -216,7 +267,7 @@ public Status needsField(FieldInfo fieldInfo) throws IOException { * This two phase iterator visits every live doc and selects all docs that don't belong into this * shard based on their id and routing value. This is only used in a routing partitioned index. */ - private final class RoutingPartitionedDocIdSetIterator extends TwoPhaseIterator { + private class RoutingPartitionedDocIdSetIterator extends TwoPhaseIterator { private final LeafReader leafReader; private final Visitor visitor; @@ -228,7 +279,10 @@ private final class RoutingPartitionedDocIdSetIterator extends TwoPhaseIterator @Override public boolean matches() throws IOException { - int doc = approximation.docID(); + return innerMatches(approximation.docID()); + } + + protected boolean innerMatches(int doc) throws IOException { visitor.reset(); leafReader.document(doc, visitor); int targetShardId = OperationRouting.generateShardId(indexMetaData, visitor.id, visitor.routing); @@ -240,6 +294,46 @@ public float matchCost() { return 42; // that's obvious, right? } } + + /** + * This TwoPhaseIterator marks all nested docs of matching parents as matches as well. + */ + private final class NestedRoutingPartitionedDocIdSetIterator extends RoutingPartitionedDocIdSetIterator { + private final BitSet parentDocs; + private int nextParent = -1; + private boolean nextParentMatches; + + NestedRoutingPartitionedDocIdSetIterator(LeafReader leafReader, Visitor visitor, BitSet parentDocs) { + super(leafReader, visitor); + this.parentDocs = parentDocs; + } + + @Override + public boolean matches() throws IOException { + int doc = approximation.docID(); + if (doc > nextParent) { + nextParent = parentDocs.nextSetBit(doc); + nextParentMatches = innerMatches(nextParent); + } + return nextParentMatches; + } + } + + /* + * this is used internally to obtain a bitset for parent documents. We don't cache this since we never access the same reader more + * than once + */ + private static BitSetProducer newParentDocBitSetProducer() { + return context -> { + Query query = Queries.newNonNestedFilter(); + final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context); + final IndexSearcher searcher = new IndexSearcher(topLevelContext); + searcher.setQueryCache(null); + final Weight weight = searcher.createNormalizedWeight(query, false); + Scorer s = weight.scorer(context); + return s == null ? null : BitSet.of(s.iterator(), context.reader().maxDoc()); + }; + } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index b59ab14961769..9216075e82238 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -115,6 +115,7 @@ boolean recoverFromLocalShards(BiConsumer mappingUpdate indexShard.mapperService().merge(sourceMetaData, MapperService.MergeReason.MAPPING_RECOVERY, true); // now that the mapping is merged we can validate the index sort configuration. Sort indexSort = indexShard.getIndexSort(); + final boolean hasNested = indexShard.mapperService().hasNested(); final boolean isSplit = sourceMetaData.getNumberOfShards() < indexShard.indexSettings().getNumberOfShards(); assert isSplit == false || sourceMetaData.getCreationVersion().onOrAfter(Version.V_6_0_0_alpha1) : "for split we require a " + "single type but the index is created before 6.0.0"; @@ -127,7 +128,7 @@ boolean recoverFromLocalShards(BiConsumer mappingUpdate final long maxUnsafeAutoIdTimestamp = shards.stream().mapToLong(LocalShardSnapshot::maxUnsafeAutoIdTimestamp).max().getAsLong(); addIndices(indexShard.recoveryState().getIndex(), directory, indexSort, sources, maxSeqNo, maxUnsafeAutoIdTimestamp, - indexShard.indexSettings().getIndexMetaData(), indexShard.shardId().id(), isSplit); + indexShard.indexSettings().getIndexMetaData(), indexShard.shardId().id(), isSplit, hasNested); internalRecoverFromStore(indexShard); // just trigger a merge to do housekeeping on the // copied segments - we will also see them in stats etc. @@ -142,8 +143,8 @@ boolean recoverFromLocalShards(BiConsumer mappingUpdate } void addIndices(final RecoveryState.Index indexRecoveryStats, final Directory target, final Sort indexSort, final Directory[] sources, - final long maxSeqNo, final long maxUnsafeAutoIdTimestamp, IndexMetaData indexMetaData, int shardId, boolean split) - throws IOException { + final long maxSeqNo, final long maxUnsafeAutoIdTimestamp, IndexMetaData indexMetaData, int shardId, boolean split, + boolean hasNested) throws IOException { final Directory hardLinkOrCopyTarget = new org.apache.lucene.store.HardlinkCopyDirectoryWrapper(target); IndexWriterConfig iwc = new IndexWriterConfig(null) .setCommitOnClose(false) @@ -158,9 +159,8 @@ void addIndices(final RecoveryState.Index indexRecoveryStats, final Directory ta try (IndexWriter writer = new IndexWriter(new StatsDirectoryWrapper(hardLinkOrCopyTarget, indexRecoveryStats), iwc)) { writer.addIndexes(sources); - if (split) { - writer.deleteDocuments(new ShardSplittingQuery(indexMetaData, shardId)); + writer.deleteDocuments(new ShardSplittingQuery(indexMetaData, shardId, hasNested)); } /* * We set the maximum sequence number and the local checkpoint on the target to the maximum of the maximum sequence numbers on diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/create/SplitIndexIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/create/SplitIndexIT.java index ebd647d0e02fd..995d86c3939ba 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/create/SplitIndexIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/create/SplitIndexIT.java @@ -23,6 +23,7 @@ import org.apache.lucene.search.SortField; import org.apache.lucene.search.SortedSetSelector; import org.apache.lucene.search.SortedSetSortField; +import org.apache.lucene.search.join.ScoreMode; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; @@ -57,15 +58,23 @@ import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.test.VersionUtils; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.IntFunction; import java.util.stream.IntStream; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.query.QueryBuilders.nestedQuery; +import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -78,13 +87,14 @@ protected Collection> nodePlugins() { return Arrays.asList(InternalSettingsPlugin.class); } - public void testCreateSplitIndexToN() { + public void testCreateSplitIndexToN() throws IOException { int[][] possibleShardSplits = new int[][] {{2,4,8}, {3, 6, 12}, {1, 2, 4}}; int[] shardSplits = randomFrom(possibleShardSplits); assertEquals(shardSplits[0], (shardSplits[0] * shardSplits[1]) / shardSplits[1]); assertEquals(shardSplits[1], (shardSplits[1] * shardSplits[2]) / shardSplits[2]); internalCluster().ensureAtLeastNumDataNodes(2); final boolean useRouting = randomBoolean(); + final boolean useNested = randomBoolean(); final boolean useMixedRouting = useRouting ? randomBoolean() : false; CreateIndexRequestBuilder createInitialIndex = prepareCreate("source"); final int routingShards = shardSplits[2] * randomIntBetween(1, 10); @@ -93,16 +103,43 @@ public void testCreateSplitIndexToN() { .put("index.number_of_routing_shards", routingShards); if (useRouting && useMixedRouting == false && randomBoolean()) { settings.put("index.routing_partition_size", randomIntBetween(1, routingShards - 1)); - createInitialIndex.addMapping("t1", "_routing", "required=true"); + if (useNested) { + createInitialIndex.addMapping("t1", "_routing", "required=true", "nested1", "type=nested"); + } else { + createInitialIndex.addMapping("t1", "_routing", "required=true"); + } + } else if (useNested) { + createInitialIndex.addMapping("t1", "nested1", "type=nested"); } - logger.info("use routing {} use mixed routing {}", useRouting, useMixedRouting); + logger.info("use routing {} use mixed routing {} use nested {}", useRouting, useMixedRouting, useNested); createInitialIndex.setSettings(settings).get(); int numDocs = randomIntBetween(10, 50); String[] routingValue = new String[numDocs]; + + BiFunction indexFunc = (index, id) -> { + try { + return client().prepareIndex(index, "t1", Integer.toString(id)) + .setSource(jsonBuilder().startObject() + .field("foo", "bar") + .field("i", id) + .startArray("nested1") + .startObject() + .field("n_field1", "n_value1_1") + .field("n_field2", "n_value2_1") + .endObject() + .startObject() + .field("n_field1", "n_value1_2") + .field("n_field2", "n_value2_2") + .endObject() + .endArray() + .endObject()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }; for (int i = 0; i < numDocs; i++) { - IndexRequestBuilder builder = client().prepareIndex("source", "t1", Integer.toString(i)) - .setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", XContentType.JSON); + IndexRequestBuilder builder = indexFunc.apply("source", i); if (useRouting) { String routing = randomRealisticUnicodeOfCodepointLengthBetween(1, 10); if (useMixedRouting && randomBoolean()) { @@ -118,8 +155,7 @@ public void testCreateSplitIndexToN() { if (randomBoolean()) { for (int i = 0; i < numDocs; i++) { // let's introduce some updates / deletes on the index if (randomBoolean()) { - IndexRequestBuilder builder = client().prepareIndex("source", "t1", Integer.toString(i)) - .setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", XContentType.JSON); + IndexRequestBuilder builder = indexFunc.apply("source", i); if (useRouting) { builder.setRouting(routingValue[i]); } @@ -145,8 +181,7 @@ public void testCreateSplitIndexToN() { assertHitCount(client().prepareSearch("first_split").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs); for (int i = 0; i < numDocs; i++) { // now update - IndexRequestBuilder builder = client().prepareIndex("first_split", "t1", Integer.toString(i)) - .setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", XContentType.JSON); + IndexRequestBuilder builder = indexFunc.apply("first_split", i); if (useRouting) { builder.setRouting(routingValue[i]); } @@ -180,8 +215,7 @@ public void testCreateSplitIndexToN() { assertHitCount(client().prepareSearch("second_split").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs); for (int i = 0; i < numDocs; i++) { // now update - IndexRequestBuilder builder = client().prepareIndex("second_split", "t1", Integer.toString(i)) - .setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", XContentType.JSON); + IndexRequestBuilder builder = indexFunc.apply("second_split", i); if (useRouting) { builder.setRouting(routingValue[i]); } @@ -195,14 +229,25 @@ public void testCreateSplitIndexToN() { assertHitCount(client().prepareSearch("second_split").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs); assertHitCount(client().prepareSearch("first_split").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs); assertHitCount(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs); - + if (useNested) { + assertNested("source", numDocs); + assertNested("first_split", numDocs); + assertNested("second_split", numDocs); + } assertAllUniqueDocs(client().prepareSearch("second_split").setSize(100) .setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs); assertAllUniqueDocs(client().prepareSearch("first_split").setSize(100) .setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs); assertAllUniqueDocs(client().prepareSearch("source").setSize(100) .setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs); + } + public void assertNested(String index, int numDocs) { + // now, do a nested query + SearchResponse searchResponse = client().prepareSearch(index).setQuery(nestedQuery("nested1", termQuery("nested1.n_field1", + "n_value1_1"), ScoreMode.Avg)).get(); + assertNoFailures(searchResponse); + assertThat(searchResponse.getHits().getTotalHits(), equalTo((long)numDocs)); } public void assertAllUniqueDocs(SearchResponse response, int numDocs) { diff --git a/core/src/test/java/org/elasticsearch/index/shard/ShardSplittingQueryTests.java b/core/src/test/java/org/elasticsearch/index/shard/ShardSplittingQueryTests.java index 7351372620fc9..203a8d578ee80 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/ShardSplittingQueryTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/ShardSplittingQueryTests.java @@ -23,6 +23,7 @@ import org.apache.lucene.document.StringField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.RandomIndexWriter; import org.apache.lucene.index.SortedNumericDocValues; @@ -38,10 +39,12 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.RoutingFieldMapper; +import org.elasticsearch.index.mapper.TypeFieldMapper; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.test.ESTestCase; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -58,18 +61,36 @@ public void testSplitOnID() throws IOException { .setRoutingNumShards(numShards * 1000000) .numberOfReplicas(0).build(); int targetShardId = randomIntBetween(0, numShards-1); + boolean hasNested = randomBoolean(); for (int j = 0; j < numDocs; j++) { int shardId = OperationRouting.generateShardId(metaData, Integer.toString(j), null); - writer.addDocument(Arrays.asList( - new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), - new SortedNumericDocValuesField("shard_id", shardId) - )); + if (hasNested) { + List> docs = new ArrayList<>(); + int numNested = randomIntBetween(0, 10); + for (int i = 0; i < numNested; i++) { + docs.add(Arrays.asList( + new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), + new StringField(TypeFieldMapper.NAME, "__nested", Field.Store.YES), + new SortedNumericDocValuesField("shard_id", shardId) + )); + } + docs.add(Arrays.asList( + new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), + new SortedNumericDocValuesField("shard_id", shardId) + )); + writer.addDocuments(docs); + } else { + writer.addDocument(Arrays.asList( + new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), + new SortedNumericDocValuesField("shard_id", shardId) + )); + } } writer.commit(); writer.close(); - assertSplit(dir, metaData, targetShardId); + assertSplit(dir, metaData, targetShardId, hasNested); dir.close(); } @@ -83,19 +104,38 @@ public void testSplitOnRouting() throws IOException { .numberOfShards(numShards) .setRoutingNumShards(numShards * 1000000) .numberOfReplicas(0).build(); + boolean hasNested = true || randomBoolean(); int targetShardId = randomIntBetween(0, numShards-1); for (int j = 0; j < numDocs; j++) { String routing = randomRealisticUnicodeOfCodepointLengthBetween(1, 5); final int shardId = OperationRouting.generateShardId(metaData, null, routing); - writer.addDocument(Arrays.asList( - new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), - new StringField(RoutingFieldMapper.NAME, routing, Field.Store.YES), - new SortedNumericDocValuesField("shard_id", shardId) - )); + if (hasNested) { + List> docs = new ArrayList<>(); + int numNested = randomIntBetween(0, 10); + for (int i = 0; i < numNested; i++) { + docs.add(Arrays.asList( + new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), + new StringField(TypeFieldMapper.NAME, "__nested", Field.Store.YES), + new SortedNumericDocValuesField("shard_id", shardId) + )); + } + docs.add(Arrays.asList( + new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), + new StringField(RoutingFieldMapper.NAME, routing, Field.Store.YES), + new SortedNumericDocValuesField("shard_id", shardId) + )); + writer.addDocuments(docs); + } else { + writer.addDocument(Arrays.asList( + new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), + new StringField(RoutingFieldMapper.NAME, routing, Field.Store.YES), + new SortedNumericDocValuesField("shard_id", shardId) + )); + } } writer.commit(); writer.close(); - assertSplit(dir, metaData, targetShardId); + assertSplit(dir, metaData, targetShardId, hasNested); dir.close(); } @@ -103,33 +143,52 @@ public void testSplitOnIdOrRouting() throws IOException { Directory dir = newFSDirectory(createTempDir()); final int numDocs = randomIntBetween(50, 100); RandomIndexWriter writer = new RandomIndexWriter(random(), dir); - int numShards = randomIntBetween(2, 10); + int numShards = randomIntBetween(2, 10); IndexMetaData metaData = IndexMetaData.builder("test") .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) .numberOfShards(numShards) .setRoutingNumShards(numShards * 1000000) .numberOfReplicas(0).build(); + boolean hasNested = true;randomBoolean(); int targetShardId = randomIntBetween(0, numShards-1); for (int j = 0; j < numDocs; j++) { + Iterable rootDoc; + final int shardId; if (randomBoolean()) { String routing = randomRealisticUnicodeOfCodepointLengthBetween(1, 5); - final int shardId = OperationRouting.generateShardId(metaData, null, routing); - writer.addDocument(Arrays.asList( + shardId = OperationRouting.generateShardId(metaData, null, routing); + rootDoc = Arrays.asList( new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), new StringField(RoutingFieldMapper.NAME, routing, Field.Store.YES), new SortedNumericDocValuesField("shard_id", shardId) - )); + ); } else { - int shardId = OperationRouting.generateShardId(metaData, Integer.toString(j), null); - writer.addDocument(Arrays.asList( + shardId = OperationRouting.generateShardId(metaData, Integer.toString(j), null); + rootDoc = Arrays.asList( new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), new SortedNumericDocValuesField("shard_id", shardId) - )); + ); + } + + if (hasNested) { + List> docs = new ArrayList<>(); + int numNested = randomIntBetween(0, 10); + for (int i = 0; i < numNested; i++) { + docs.add(Arrays.asList( + new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), + new StringField(TypeFieldMapper.NAME, "__nested", Field.Store.YES), + new SortedNumericDocValuesField("shard_id", shardId) + )); + } + docs.add(rootDoc); + writer.addDocuments(docs); + } else { + writer.addDocument(rootDoc); } } writer.commit(); writer.close(); - assertSplit(dir, metaData, targetShardId); + assertSplit(dir, metaData, targetShardId, hasNested); dir.close(); } @@ -145,47 +204,94 @@ public void testSplitOnRoutingPartitioned() throws IOException { .setRoutingNumShards(numShards * 1000000) .routingPartitionSize(randomIntBetween(1, 10)) .numberOfReplicas(0).build(); + boolean hasNested = randomBoolean(); int targetShardId = randomIntBetween(0, numShards-1); for (int j = 0; j < numDocs; j++) { String routing = randomRealisticUnicodeOfCodepointLengthBetween(1, 5); final int shardId = OperationRouting.generateShardId(metaData, Integer.toString(j), routing); - writer.addDocument(Arrays.asList( - new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), - new StringField(RoutingFieldMapper.NAME, routing, Field.Store.YES), - new SortedNumericDocValuesField("shard_id", shardId) - )); + + if (hasNested) { + List> docs = new ArrayList<>(); + int numNested = randomIntBetween(0, 10); + for (int i = 0; i < numNested; i++) { + docs.add(Arrays.asList( + new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), + new StringField(TypeFieldMapper.NAME, "__nested", Field.Store.YES), + new SortedNumericDocValuesField("shard_id", shardId) + )); + } + docs.add(Arrays.asList( + new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), + new StringField(RoutingFieldMapper.NAME, routing, Field.Store.YES), + new SortedNumericDocValuesField("shard_id", shardId) + )); + writer.addDocuments(docs); + } else { + writer.addDocument(Arrays.asList( + new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), + new StringField(RoutingFieldMapper.NAME, routing, Field.Store.YES), + new SortedNumericDocValuesField("shard_id", shardId) + )); + } } writer.commit(); writer.close(); - assertSplit(dir, metaData, targetShardId); + assertSplit(dir, metaData, targetShardId, hasNested); dir.close(); } - void assertSplit(Directory dir, IndexMetaData metaData, int targetShardId) throws IOException { + void assertSplit(Directory dir, IndexMetaData metaData, int targetShardId, boolean hasNested) throws IOException { try (IndexReader reader = DirectoryReader.open(dir)) { IndexSearcher searcher = new IndexSearcher(reader); searcher.setQueryCache(null); final boolean needsScores = false; - final Weight splitWeight = searcher.createNormalizedWeight(new ShardSplittingQuery(metaData, targetShardId), needsScores); + final Weight splitWeight = searcher.createNormalizedWeight(new ShardSplittingQuery(metaData, targetShardId, hasNested), + needsScores); final List leaves = reader.leaves(); for (final LeafReaderContext ctx : leaves) { Scorer scorer = splitWeight.scorer(ctx); DocIdSetIterator iterator = scorer.iterator(); SortedNumericDocValues shard_id = ctx.reader().getSortedNumericDocValues("shard_id"); - int doc; - while ((doc = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { - while (shard_id.nextDoc() < doc) { + int numExpected = 0; + while (shard_id.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + if (targetShardId == shard_id.nextValue()) { + numExpected++; + } + } + if (numExpected == ctx.reader().maxDoc()) { + // all docs belong in this shard + assertEquals(DocIdSetIterator.NO_MORE_DOCS, iterator.nextDoc()); + } else { + shard_id = ctx.reader().getSortedNumericDocValues("shard_id"); + int doc; + int numActual = 0; + int lastDoc = 0; + while ((doc = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + lastDoc = doc; + while (shard_id.nextDoc() < doc) { + long shardID = shard_id.nextValue(); + assertEquals(shardID, targetShardId); + numActual++; + } + assertEquals(shard_id.docID(), doc); long shardID = shard_id.nextValue(); - assertEquals(shardID, targetShardId); + BytesRef id = reader.document(doc).getBinaryValue("_id"); + String actualId = Uid.decodeId(id.bytes, id.offset, id.length); + assertNotEquals(ctx.reader() + " docID: " + doc + " actualID: " + actualId, shardID, targetShardId); + } + if (lastDoc < ctx.reader().maxDoc()) { + // check the last docs in the segment and make sure they all have the right shard id + while (shard_id.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + long shardID = shard_id.nextValue(); + assertEquals(shardID, targetShardId); + numActual++; + } } - assertEquals(shard_id.docID(), doc); - long shardID = shard_id.nextValue(); - BytesRef id = reader.document(doc).getBinaryValue("_id"); - String actualId = Uid.decodeId(id.bytes, id.offset, id.length); - assertNotEquals(ctx.reader() + " docID: " + doc + " actualID: " + actualId, shardID, targetShardId); + + assertEquals(numExpected, numActual); } } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/StoreRecoveryTests.java b/core/src/test/java/org/elasticsearch/index/shard/StoreRecoveryTests.java index 05b092ff3a461..61d6a5a845bf8 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/StoreRecoveryTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/StoreRecoveryTests.java @@ -100,7 +100,7 @@ public void testAddIndices() throws IOException { Directory target = newFSDirectory(createTempDir()); final long maxSeqNo = randomNonNegativeLong(); final long maxUnsafeAutoIdTimestamp = randomNonNegativeLong(); - storeRecovery.addIndices(indexStats, target, indexSort, dirs, maxSeqNo, maxUnsafeAutoIdTimestamp, null, 0, false); + storeRecovery.addIndices(indexStats, target, indexSort, dirs, maxSeqNo, maxUnsafeAutoIdTimestamp, null, 0, false, false); int numFiles = 0; Predicate filesFilter = (f) -> f.startsWith("segments") == false && f.equals("write.lock") == false && f.startsWith("extra") == false; @@ -174,7 +174,7 @@ public void testSplitShard() throws IOException { .setRoutingNumShards(numShards * 1000000) .numberOfReplicas(0).build(); storeRecovery.addIndices(indexStats, target, indexSort, new Directory[] {dir}, maxSeqNo, maxUnsafeAutoIdTimestamp, metaData, - targetShardId, true); + targetShardId, true, false); SegmentInfos segmentCommitInfos = SegmentInfos.readLatestCommit(target); From 7aafdd604f709d77412ef9a3f8118ebcb2ac6adc Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 16 Nov 2017 09:48:40 +0100 Subject: [PATCH 2/3] apply review comments and simplify code --- .../index/shard/ShardSplittingQuery.java | 124 ++++++++++-------- .../index/shard/ShardSplittingQueryTests.java | 4 +- 2 files changed, 69 insertions(+), 59 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShardSplittingQuery.java b/core/src/main/java/org/elasticsearch/index/shard/ShardSplittingQuery.java index e6892af517b02..92cbdff10fef3 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShardSplittingQuery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShardSplittingQuery.java @@ -49,6 +49,7 @@ import org.elasticsearch.index.mapper.Uid; import java.io.IOException; +import java.util.function.Function; import java.util.function.IntConsumer; import java.util.function.IntPredicate; import java.util.function.Predicate; @@ -64,17 +65,13 @@ final class ShardSplittingQuery extends Query { private final BitSetProducer nestedParentBitSetProducer; ShardSplittingQuery(IndexMetaData indexMetaData, int shardId, boolean hasNested) { - this(indexMetaData, shardId, hasNested ? newParentDocBitSetProducer() : null); - } - - private ShardSplittingQuery(IndexMetaData indexMetaData, int shardId, BitSetProducer nestedParentBitSetProducer) { if (indexMetaData.getCreationVersion().before(Version.V_6_0_0_rc2)) { throw new IllegalArgumentException("Splitting query can only be executed on an index created with version " + Version.V_6_0_0_rc2 + " or higher"); } this.indexMetaData = indexMetaData; this.shardId = shardId; - this.nestedParentBitSetProducer = nestedParentBitSetProducer; + this.nestedParentBitSetProducer = hasNested ? newParentDocBitSetProducer() : null; } @Override public Weight createWeight(IndexSearcher searcher, boolean needsScores, float boost) { @@ -102,56 +99,56 @@ public Scorer scorer(LeafReaderContext context) throws IOException { findSplitDocs(IdFieldMapper.NAME, includeInShard, leafReader, bitSet::set); } else { final BitSet parentBitSet; - final IntPredicate includeDoc; if (nestedParentBitSetProducer == null) { parentBitSet = null; - includeDoc = i -> true; } else { parentBitSet = nestedParentBitSetProducer.getBitSet(context); if (parentBitSet == null) { return null; // no matches } - includeDoc = parentBitSet::get; } if (indexMetaData.isRoutingPartitionedIndex()) { // this is the heaviest invariant. Here we have to visit all docs stored fields do extract _id and _routing // this this index is routing partitioned. - Visitor visitor = new Visitor(); + Visitor visitor = new Visitor(leafReader); TwoPhaseIterator twoPhaseIterator = - parentBitSet == null ? new RoutingPartitionedDocIdSetIterator(leafReader, visitor) : - new NestedRoutingPartitionedDocIdSetIterator(leafReader, visitor, parentBitSet); + parentBitSet == null ? new RoutingPartitionedDocIdSetIterator(visitor) : + new NestedRoutingPartitionedDocIdSetIterator(visitor, parentBitSet); return new ConstantScoreScorer(this, score(), twoPhaseIterator); } else { + // here we potentially guard the docID consumers with our parent bitset if we have one. + Function maybeWrapConsumer = consumer -> { + if (parentBitSet != null) { + return docId -> { + if (parentBitSet.get(docId)) { + consumer.accept(docId); + } + }; + } + return consumer; + }; // in the _routing case we first go and find all docs that have a routing value and mark the ones we have to delete findSplitDocs(RoutingFieldMapper.NAME, ref -> { int targetShardId = OperationRouting.generateShardId(indexMetaData, null, ref.utf8ToString()); return shardId == targetShardId; - }, leafReader, docId -> { - if (includeDoc.test(docId)) { - bitSet.set(docId); - } - }); + }, leafReader, maybeWrapConsumer.apply(bitSet::set)); // now if we have a mixed index where some docs have a _routing value and some don't we have to exclude the ones // with a routing value from the next iteration an delete / select based on the ID. if (terms.getDocCount() != leafReader.maxDoc()) { // this is a special case where some of the docs have no routing values this sucks but it's possible today FixedBitSet hasRoutingValue = new FixedBitSet(leafReader.maxDoc()); - findSplitDocs(RoutingFieldMapper.NAME, ref -> false, leafReader, - docId -> { - if (includeDoc.test(docId)) { - hasRoutingValue.set(docId); - } - }); - + findSplitDocs(RoutingFieldMapper.NAME, ref -> false, leafReader, maybeWrapConsumer.apply(hasRoutingValue::set)); + IntConsumer bitSetConsumer = maybeWrapConsumer.apply(bitSet::set); findSplitDocs(IdFieldMapper.NAME, includeInShard, leafReader, docId -> { - if (hasRoutingValue.get(docId) == false && includeDoc.test(docId)) { - bitSet.set(docId); + if (hasRoutingValue.get(docId) == false) { + bitSetConsumer.accept(docId); } }); } } if (parentBitSet != null) { + // if nested docs are involved we also need to mark all child docs that belong to a matching parent doc. markChildDocs(parentBitSet, bitSet); } } @@ -161,13 +158,13 @@ public Scorer scorer(LeafReaderContext context) throws IOException { }; } - private void markChildDocs(BitSet parentDocs, BitSet deletedDocs) { + private void markChildDocs(BitSet parentDocs, BitSet matchingDocs) { int currentDeleted = 0; - while (currentDeleted < deletedDocs.length() && - (currentDeleted = deletedDocs.nextSetBit(currentDeleted)) != DocIdSetIterator.NO_MORE_DOCS) { + while (currentDeleted < matchingDocs.length() && + (currentDeleted = matchingDocs.nextSetBit(currentDeleted)) != DocIdSetIterator.NO_MORE_DOCS) { int previousParent = parentDocs.prevSetBit(Math.max(0, currentDeleted-1)); for (int i = previousParent + 1; i < currentDeleted; i++) { - deletedDocs.set(i); + matchingDocs.set(i); } currentDeleted++; } @@ -196,8 +193,8 @@ public int hashCode() { return classHash() ^ result; } - private static void findSplitDocs(String idField, Predicate includeInShard, - LeafReader leafReader, IntConsumer consumer) throws IOException { + private static void findSplitDocs(String idField, Predicate includeInShard, LeafReader leafReader, + IntConsumer consumer) throws IOException { Terms terms = leafReader.terms(idField); TermsEnum iterator = terms.iterator(); BytesRef idTerm; @@ -213,15 +210,17 @@ private static void findSplitDocs(String idField, Predicate includeInS } } - private static final class Visitor extends StoredFieldVisitor { - int leftToVisit = 2; - final BytesRef spare = new BytesRef(); - String routing; - String id; + /* this class is a stored fields visitor that reads _id and/or _routing from the stored fields which is necessary in the case + of a routing partitioned index sine otherwise we would need to un-invert the _id and _routing field which is memory heavy */ + private final class Visitor extends StoredFieldVisitor { + final LeafReader leafReader; + private int leftToVisit = 2; + private final BytesRef spare = new BytesRef(); + private String routing; + private String id; - void reset() { - routing = id = null; - leftToVisit = 2; + Visitor(LeafReader leafReader) { + this.leafReader = leafReader; } @Override @@ -261,32 +260,32 @@ public Status needsField(FieldInfo fieldInfo) throws IOException { return leftToVisit == 0 ? Status.STOP : Status.NO; } } + + boolean matches(int doc) throws IOException { + routing = id = null; + leftToVisit = 2; + leafReader.document(doc, this); + assert id != null : "docID must not be null - we might have hit a nested document"; + int targetShardId = OperationRouting.generateShardId(indexMetaData, id, routing); + return targetShardId != shardId; + } } /** * This two phase iterator visits every live doc and selects all docs that don't belong into this * shard based on their id and routing value. This is only used in a routing partitioned index. */ - private class RoutingPartitionedDocIdSetIterator extends TwoPhaseIterator { - private final LeafReader leafReader; + private static final class RoutingPartitionedDocIdSetIterator extends TwoPhaseIterator { private final Visitor visitor; - RoutingPartitionedDocIdSetIterator(LeafReader leafReader, Visitor visitor) { - super(DocIdSetIterator.all(leafReader.maxDoc())); // we iterate all live-docs - this.leafReader = leafReader; + RoutingPartitionedDocIdSetIterator(Visitor visitor) { + super(DocIdSetIterator.all(visitor.leafReader.maxDoc())); // we iterate all live-docs this.visitor = visitor; } @Override public boolean matches() throws IOException { - return innerMatches(approximation.docID()); - } - - protected boolean innerMatches(int doc) throws IOException { - visitor.reset(); - leafReader.document(doc, visitor); - int targetShardId = OperationRouting.generateShardId(indexMetaData, visitor.id, visitor.routing); - return targetShardId != shardId; + return visitor.matches(approximation.docID()); } @Override @@ -298,30 +297,41 @@ public float matchCost() { /** * This TwoPhaseIterator marks all nested docs of matching parents as matches as well. */ - private final class NestedRoutingPartitionedDocIdSetIterator extends RoutingPartitionedDocIdSetIterator { + private static final class NestedRoutingPartitionedDocIdSetIterator extends TwoPhaseIterator { + private final Visitor visitor; private final BitSet parentDocs; private int nextParent = -1; private boolean nextParentMatches; - NestedRoutingPartitionedDocIdSetIterator(LeafReader leafReader, Visitor visitor, BitSet parentDocs) { - super(leafReader, visitor); + NestedRoutingPartitionedDocIdSetIterator(Visitor visitor, BitSet parentDocs) { + super(DocIdSetIterator.all(visitor.leafReader.maxDoc())); // we iterate all live-docs this.parentDocs = parentDocs; + this.visitor = visitor; } @Override public boolean matches() throws IOException { + // the educated reader might ask why this works, it does because all live doc ids (root docs and nested docs) are evaluated in + // order and that way we don't need to seek backwards as we do in other nested docs cases. int doc = approximation.docID(); if (doc > nextParent) { + // we only check once per nested/parent set nextParent = parentDocs.nextSetBit(doc); - nextParentMatches = innerMatches(nextParent); + nextParentMatches = visitor.matches(nextParent); } return nextParentMatches; } + + @Override + public float matchCost() { + return 42; // that's obvious, right? + } } /* * this is used internally to obtain a bitset for parent documents. We don't cache this since we never access the same reader more - * than once + * than once. There is no point in using BitsetFilterCache#BitSetProducerWarmer since we use this only as a delete by query which is + * executed on a recovery-private index writer. There is no point in caching it and it won't have a cache hit either. */ private static BitSetProducer newParentDocBitSetProducer() { return context -> { diff --git a/core/src/test/java/org/elasticsearch/index/shard/ShardSplittingQueryTests.java b/core/src/test/java/org/elasticsearch/index/shard/ShardSplittingQueryTests.java index 203a8d578ee80..c0b492b0cb67d 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/ShardSplittingQueryTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/ShardSplittingQueryTests.java @@ -104,7 +104,7 @@ public void testSplitOnRouting() throws IOException { .numberOfShards(numShards) .setRoutingNumShards(numShards * 1000000) .numberOfReplicas(0).build(); - boolean hasNested = true || randomBoolean(); + boolean hasNested = randomBoolean(); int targetShardId = randomIntBetween(0, numShards-1); for (int j = 0; j < numDocs; j++) { String routing = randomRealisticUnicodeOfCodepointLengthBetween(1, 5); @@ -149,7 +149,7 @@ public void testSplitOnIdOrRouting() throws IOException { .numberOfShards(numShards) .setRoutingNumShards(numShards * 1000000) .numberOfReplicas(0).build(); - boolean hasNested = true;randomBoolean(); + boolean hasNested = randomBoolean(); int targetShardId = randomIntBetween(0, numShards-1); for (int j = 0; j < numDocs; j++) { Iterable rootDoc; From cfa6cc2d27038bdd488706d5076e382af75b3fc1 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 16 Nov 2017 09:52:04 +0100 Subject: [PATCH 3/3] add more comments --- .../org/elasticsearch/index/shard/ShardSplittingQuery.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShardSplittingQuery.java b/core/src/main/java/org/elasticsearch/index/shard/ShardSplittingQuery.java index 92cbdff10fef3..da13d4ba15f4f 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShardSplittingQuery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShardSplittingQuery.java @@ -117,6 +117,8 @@ public Scorer scorer(LeafReaderContext context) throws IOException { return new ConstantScoreScorer(this, score(), twoPhaseIterator); } else { // here we potentially guard the docID consumers with our parent bitset if we have one. + // this ensures that we are only marking root documents in the nested case and if necessary + // we do a second pass to mark the corresponding children in markChildDocs Function maybeWrapConsumer = consumer -> { if (parentBitSet != null) { return docId -> { @@ -317,6 +319,7 @@ public boolean matches() throws IOException { if (doc > nextParent) { // we only check once per nested/parent set nextParent = parentDocs.nextSetBit(doc); + // never check a child document against the visitor, they neihter have _id nor _routing as stored fields nextParentMatches = visitor.matches(nextParent); } return nextParentMatches;