diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShardSplittingQuery.java b/core/src/main/java/org/elasticsearch/index/shard/ShardSplittingQuery.java index 94aee085175a0..da13d4ba15f4f 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/ShardSplittingQuery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/ShardSplittingQuery.java @@ -19,9 +19,11 @@ package org.elasticsearch.index.shard; import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.IndexReaderContext; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.PostingsEnum; +import org.apache.lucene.index.ReaderUtil; import org.apache.lucene.index.StoredFieldVisitor; import org.apache.lucene.index.Terms; import org.apache.lucene.index.TermsEnum; @@ -33,19 +35,23 @@ import org.apache.lucene.search.Scorer; import org.apache.lucene.search.TwoPhaseIterator; import org.apache.lucene.search.Weight; +import org.apache.lucene.search.join.BitSetProducer; +import org.apache.lucene.util.BitSet; import org.apache.lucene.util.BitSetIterator; -import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.FixedBitSet; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.OperationRouting; +import org.elasticsearch.common.lucene.search.Queries; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.RoutingFieldMapper; import org.elasticsearch.index.mapper.Uid; import java.io.IOException; +import java.util.function.Function; import java.util.function.IntConsumer; +import java.util.function.IntPredicate; import java.util.function.Predicate; /** @@ -56,16 +62,17 @@ final class ShardSplittingQuery extends Query { private final IndexMetaData indexMetaData; private final int shardId; + private final BitSetProducer nestedParentBitSetProducer; - ShardSplittingQuery(IndexMetaData indexMetaData, int shardId) { + ShardSplittingQuery(IndexMetaData indexMetaData, int shardId, boolean hasNested) { if (indexMetaData.getCreationVersion().before(Version.V_6_0_0_rc2)) { throw new IllegalArgumentException("Splitting query can only be executed on an index created with version " + Version.V_6_0_0_rc2 + " or higher"); } this.indexMetaData = indexMetaData; this.shardId = shardId; + this.nestedParentBitSetProducer = hasNested ? newParentDocBitSetProducer() : null; } - @Override public Weight createWeight(IndexSearcher searcher, boolean needsScores, float boost) { return new ConstantScoreWeight(this, boost) { @@ -84,44 +91,87 @@ public Scorer scorer(LeafReaderContext context) throws IOException { Uid.decodeId(ref.bytes, ref.offset, ref.length), null); return shardId == targetShardId; }; - if (terms == null) { // this is the common case - no partitioning and no _routing values + if (terms == null) { + // this is the common case - no partitioning and no _routing values + // in this case we also don't do anything special with regards to nested docs since we basically delete + // by ID and parent and nested all have the same id. assert indexMetaData.isRoutingPartitionedIndex() == false; findSplitDocs(IdFieldMapper.NAME, includeInShard, leafReader, bitSet::set); } else { + final BitSet parentBitSet; + if (nestedParentBitSetProducer == null) { + parentBitSet = null; + } else { + parentBitSet = nestedParentBitSetProducer.getBitSet(context); + if (parentBitSet == null) { + return null; // no matches + } + } if (indexMetaData.isRoutingPartitionedIndex()) { // this is the heaviest invariant. Here we have to visit all docs stored fields do extract _id and _routing // this this index is routing partitioned. - Visitor visitor = new Visitor(); - return new ConstantScoreScorer(this, score(), - new RoutingPartitionedDocIdSetIterator(leafReader, visitor)); + Visitor visitor = new Visitor(leafReader); + TwoPhaseIterator twoPhaseIterator = + parentBitSet == null ? new RoutingPartitionedDocIdSetIterator(visitor) : + new NestedRoutingPartitionedDocIdSetIterator(visitor, parentBitSet); + return new ConstantScoreScorer(this, score(), twoPhaseIterator); } else { + // here we potentially guard the docID consumers with our parent bitset if we have one. + // this ensures that we are only marking root documents in the nested case and if necessary + // we do a second pass to mark the corresponding children in markChildDocs + Function maybeWrapConsumer = consumer -> { + if (parentBitSet != null) { + return docId -> { + if (parentBitSet.get(docId)) { + consumer.accept(docId); + } + }; + } + return consumer; + }; // in the _routing case we first go and find all docs that have a routing value and mark the ones we have to delete findSplitDocs(RoutingFieldMapper.NAME, ref -> { int targetShardId = OperationRouting.generateShardId(indexMetaData, null, ref.utf8ToString()); return shardId == targetShardId; - }, leafReader, bitSet::set); + }, leafReader, maybeWrapConsumer.apply(bitSet::set)); + // now if we have a mixed index where some docs have a _routing value and some don't we have to exclude the ones // with a routing value from the next iteration an delete / select based on the ID. if (terms.getDocCount() != leafReader.maxDoc()) { // this is a special case where some of the docs have no routing values this sucks but it's possible today FixedBitSet hasRoutingValue = new FixedBitSet(leafReader.maxDoc()); - findSplitDocs(RoutingFieldMapper.NAME, ref -> false, leafReader, - hasRoutingValue::set); + findSplitDocs(RoutingFieldMapper.NAME, ref -> false, leafReader, maybeWrapConsumer.apply(hasRoutingValue::set)); + IntConsumer bitSetConsumer = maybeWrapConsumer.apply(bitSet::set); findSplitDocs(IdFieldMapper.NAME, includeInShard, leafReader, docId -> { if (hasRoutingValue.get(docId) == false) { - bitSet.set(docId); + bitSetConsumer.accept(docId); } }); } } + if (parentBitSet != null) { + // if nested docs are involved we also need to mark all child docs that belong to a matching parent doc. + markChildDocs(parentBitSet, bitSet); + } } + return new ConstantScoreScorer(this, score(), new BitSetIterator(bitSet, bitSet.length())); } - - }; } + private void markChildDocs(BitSet parentDocs, BitSet matchingDocs) { + int currentDeleted = 0; + while (currentDeleted < matchingDocs.length() && + (currentDeleted = matchingDocs.nextSetBit(currentDeleted)) != DocIdSetIterator.NO_MORE_DOCS) { + int previousParent = parentDocs.prevSetBit(Math.max(0, currentDeleted-1)); + for (int i = previousParent + 1; i < currentDeleted; i++) { + matchingDocs.set(i); + } + currentDeleted++; + } + } + @Override public String toString(String field) { return "shard_splitting_query"; @@ -145,8 +195,8 @@ public int hashCode() { return classHash() ^ result; } - private static void findSplitDocs(String idField, Predicate includeInShard, - LeafReader leafReader, IntConsumer consumer) throws IOException { + private static void findSplitDocs(String idField, Predicate includeInShard, LeafReader leafReader, + IntConsumer consumer) throws IOException { Terms terms = leafReader.terms(idField); TermsEnum iterator = terms.iterator(); BytesRef idTerm; @@ -162,15 +212,17 @@ private static void findSplitDocs(String idField, Predicate includeInS } } - private static final class Visitor extends StoredFieldVisitor { - int leftToVisit = 2; - final BytesRef spare = new BytesRef(); - String routing; - String id; + /* this class is a stored fields visitor that reads _id and/or _routing from the stored fields which is necessary in the case + of a routing partitioned index sine otherwise we would need to un-invert the _id and _routing field which is memory heavy */ + private final class Visitor extends StoredFieldVisitor { + final LeafReader leafReader; + private int leftToVisit = 2; + private final BytesRef spare = new BytesRef(); + private String routing; + private String id; - void reset() { - routing = id = null; - leftToVisit = 2; + Visitor(LeafReader leafReader) { + this.leafReader = leafReader; } @Override @@ -210,29 +262,67 @@ public Status needsField(FieldInfo fieldInfo) throws IOException { return leftToVisit == 0 ? Status.STOP : Status.NO; } } + + boolean matches(int doc) throws IOException { + routing = id = null; + leftToVisit = 2; + leafReader.document(doc, this); + assert id != null : "docID must not be null - we might have hit a nested document"; + int targetShardId = OperationRouting.generateShardId(indexMetaData, id, routing); + return targetShardId != shardId; + } } /** * This two phase iterator visits every live doc and selects all docs that don't belong into this * shard based on their id and routing value. This is only used in a routing partitioned index. */ - private final class RoutingPartitionedDocIdSetIterator extends TwoPhaseIterator { - private final LeafReader leafReader; + private static final class RoutingPartitionedDocIdSetIterator extends TwoPhaseIterator { private final Visitor visitor; - RoutingPartitionedDocIdSetIterator(LeafReader leafReader, Visitor visitor) { - super(DocIdSetIterator.all(leafReader.maxDoc())); // we iterate all live-docs - this.leafReader = leafReader; + RoutingPartitionedDocIdSetIterator(Visitor visitor) { + super(DocIdSetIterator.all(visitor.leafReader.maxDoc())); // we iterate all live-docs this.visitor = visitor; } @Override public boolean matches() throws IOException { + return visitor.matches(approximation.docID()); + } + + @Override + public float matchCost() { + return 42; // that's obvious, right? + } + } + + /** + * This TwoPhaseIterator marks all nested docs of matching parents as matches as well. + */ + private static final class NestedRoutingPartitionedDocIdSetIterator extends TwoPhaseIterator { + private final Visitor visitor; + private final BitSet parentDocs; + private int nextParent = -1; + private boolean nextParentMatches; + + NestedRoutingPartitionedDocIdSetIterator(Visitor visitor, BitSet parentDocs) { + super(DocIdSetIterator.all(visitor.leafReader.maxDoc())); // we iterate all live-docs + this.parentDocs = parentDocs; + this.visitor = visitor; + } + + @Override + public boolean matches() throws IOException { + // the educated reader might ask why this works, it does because all live doc ids (root docs and nested docs) are evaluated in + // order and that way we don't need to seek backwards as we do in other nested docs cases. int doc = approximation.docID(); - visitor.reset(); - leafReader.document(doc, visitor); - int targetShardId = OperationRouting.generateShardId(indexMetaData, visitor.id, visitor.routing); - return targetShardId != shardId; + if (doc > nextParent) { + // we only check once per nested/parent set + nextParent = parentDocs.nextSetBit(doc); + // never check a child document against the visitor, they neihter have _id nor _routing as stored fields + nextParentMatches = visitor.matches(nextParent); + } + return nextParentMatches; } @Override @@ -240,6 +330,23 @@ public float matchCost() { return 42; // that's obvious, right? } } + + /* + * this is used internally to obtain a bitset for parent documents. We don't cache this since we never access the same reader more + * than once. There is no point in using BitsetFilterCache#BitSetProducerWarmer since we use this only as a delete by query which is + * executed on a recovery-private index writer. There is no point in caching it and it won't have a cache hit either. + */ + private static BitSetProducer newParentDocBitSetProducer() { + return context -> { + Query query = Queries.newNonNestedFilter(); + final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context); + final IndexSearcher searcher = new IndexSearcher(topLevelContext); + searcher.setQueryCache(null); + final Weight weight = searcher.createNormalizedWeight(query, false); + Scorer s = weight.scorer(context); + return s == null ? null : BitSet.of(s.iterator(), context.reader().maxDoc()); + }; + } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index b59ab14961769..9216075e82238 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -115,6 +115,7 @@ boolean recoverFromLocalShards(BiConsumer mappingUpdate indexShard.mapperService().merge(sourceMetaData, MapperService.MergeReason.MAPPING_RECOVERY, true); // now that the mapping is merged we can validate the index sort configuration. Sort indexSort = indexShard.getIndexSort(); + final boolean hasNested = indexShard.mapperService().hasNested(); final boolean isSplit = sourceMetaData.getNumberOfShards() < indexShard.indexSettings().getNumberOfShards(); assert isSplit == false || sourceMetaData.getCreationVersion().onOrAfter(Version.V_6_0_0_alpha1) : "for split we require a " + "single type but the index is created before 6.0.0"; @@ -127,7 +128,7 @@ boolean recoverFromLocalShards(BiConsumer mappingUpdate final long maxUnsafeAutoIdTimestamp = shards.stream().mapToLong(LocalShardSnapshot::maxUnsafeAutoIdTimestamp).max().getAsLong(); addIndices(indexShard.recoveryState().getIndex(), directory, indexSort, sources, maxSeqNo, maxUnsafeAutoIdTimestamp, - indexShard.indexSettings().getIndexMetaData(), indexShard.shardId().id(), isSplit); + indexShard.indexSettings().getIndexMetaData(), indexShard.shardId().id(), isSplit, hasNested); internalRecoverFromStore(indexShard); // just trigger a merge to do housekeeping on the // copied segments - we will also see them in stats etc. @@ -142,8 +143,8 @@ boolean recoverFromLocalShards(BiConsumer mappingUpdate } void addIndices(final RecoveryState.Index indexRecoveryStats, final Directory target, final Sort indexSort, final Directory[] sources, - final long maxSeqNo, final long maxUnsafeAutoIdTimestamp, IndexMetaData indexMetaData, int shardId, boolean split) - throws IOException { + final long maxSeqNo, final long maxUnsafeAutoIdTimestamp, IndexMetaData indexMetaData, int shardId, boolean split, + boolean hasNested) throws IOException { final Directory hardLinkOrCopyTarget = new org.apache.lucene.store.HardlinkCopyDirectoryWrapper(target); IndexWriterConfig iwc = new IndexWriterConfig(null) .setCommitOnClose(false) @@ -158,9 +159,8 @@ void addIndices(final RecoveryState.Index indexRecoveryStats, final Directory ta try (IndexWriter writer = new IndexWriter(new StatsDirectoryWrapper(hardLinkOrCopyTarget, indexRecoveryStats), iwc)) { writer.addIndexes(sources); - if (split) { - writer.deleteDocuments(new ShardSplittingQuery(indexMetaData, shardId)); + writer.deleteDocuments(new ShardSplittingQuery(indexMetaData, shardId, hasNested)); } /* * We set the maximum sequence number and the local checkpoint on the target to the maximum of the maximum sequence numbers on diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/create/SplitIndexIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/create/SplitIndexIT.java index ebd647d0e02fd..995d86c3939ba 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/create/SplitIndexIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/create/SplitIndexIT.java @@ -23,6 +23,7 @@ import org.apache.lucene.search.SortField; import org.apache.lucene.search.SortedSetSelector; import org.apache.lucene.search.SortedSetSortField; +import org.apache.lucene.search.join.ScoreMode; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; @@ -57,15 +58,23 @@ import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.test.VersionUtils; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.IntFunction; import java.util.stream.IntStream; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.query.QueryBuilders.nestedQuery; +import static org.elasticsearch.index.query.QueryBuilders.termQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -78,13 +87,14 @@ protected Collection> nodePlugins() { return Arrays.asList(InternalSettingsPlugin.class); } - public void testCreateSplitIndexToN() { + public void testCreateSplitIndexToN() throws IOException { int[][] possibleShardSplits = new int[][] {{2,4,8}, {3, 6, 12}, {1, 2, 4}}; int[] shardSplits = randomFrom(possibleShardSplits); assertEquals(shardSplits[0], (shardSplits[0] * shardSplits[1]) / shardSplits[1]); assertEquals(shardSplits[1], (shardSplits[1] * shardSplits[2]) / shardSplits[2]); internalCluster().ensureAtLeastNumDataNodes(2); final boolean useRouting = randomBoolean(); + final boolean useNested = randomBoolean(); final boolean useMixedRouting = useRouting ? randomBoolean() : false; CreateIndexRequestBuilder createInitialIndex = prepareCreate("source"); final int routingShards = shardSplits[2] * randomIntBetween(1, 10); @@ -93,16 +103,43 @@ public void testCreateSplitIndexToN() { .put("index.number_of_routing_shards", routingShards); if (useRouting && useMixedRouting == false && randomBoolean()) { settings.put("index.routing_partition_size", randomIntBetween(1, routingShards - 1)); - createInitialIndex.addMapping("t1", "_routing", "required=true"); + if (useNested) { + createInitialIndex.addMapping("t1", "_routing", "required=true", "nested1", "type=nested"); + } else { + createInitialIndex.addMapping("t1", "_routing", "required=true"); + } + } else if (useNested) { + createInitialIndex.addMapping("t1", "nested1", "type=nested"); } - logger.info("use routing {} use mixed routing {}", useRouting, useMixedRouting); + logger.info("use routing {} use mixed routing {} use nested {}", useRouting, useMixedRouting, useNested); createInitialIndex.setSettings(settings).get(); int numDocs = randomIntBetween(10, 50); String[] routingValue = new String[numDocs]; + + BiFunction indexFunc = (index, id) -> { + try { + return client().prepareIndex(index, "t1", Integer.toString(id)) + .setSource(jsonBuilder().startObject() + .field("foo", "bar") + .field("i", id) + .startArray("nested1") + .startObject() + .field("n_field1", "n_value1_1") + .field("n_field2", "n_value2_1") + .endObject() + .startObject() + .field("n_field1", "n_value1_2") + .field("n_field2", "n_value2_2") + .endObject() + .endArray() + .endObject()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }; for (int i = 0; i < numDocs; i++) { - IndexRequestBuilder builder = client().prepareIndex("source", "t1", Integer.toString(i)) - .setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", XContentType.JSON); + IndexRequestBuilder builder = indexFunc.apply("source", i); if (useRouting) { String routing = randomRealisticUnicodeOfCodepointLengthBetween(1, 10); if (useMixedRouting && randomBoolean()) { @@ -118,8 +155,7 @@ public void testCreateSplitIndexToN() { if (randomBoolean()) { for (int i = 0; i < numDocs; i++) { // let's introduce some updates / deletes on the index if (randomBoolean()) { - IndexRequestBuilder builder = client().prepareIndex("source", "t1", Integer.toString(i)) - .setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", XContentType.JSON); + IndexRequestBuilder builder = indexFunc.apply("source", i); if (useRouting) { builder.setRouting(routingValue[i]); } @@ -145,8 +181,7 @@ public void testCreateSplitIndexToN() { assertHitCount(client().prepareSearch("first_split").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs); for (int i = 0; i < numDocs; i++) { // now update - IndexRequestBuilder builder = client().prepareIndex("first_split", "t1", Integer.toString(i)) - .setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", XContentType.JSON); + IndexRequestBuilder builder = indexFunc.apply("first_split", i); if (useRouting) { builder.setRouting(routingValue[i]); } @@ -180,8 +215,7 @@ public void testCreateSplitIndexToN() { assertHitCount(client().prepareSearch("second_split").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs); for (int i = 0; i < numDocs; i++) { // now update - IndexRequestBuilder builder = client().prepareIndex("second_split", "t1", Integer.toString(i)) - .setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", XContentType.JSON); + IndexRequestBuilder builder = indexFunc.apply("second_split", i); if (useRouting) { builder.setRouting(routingValue[i]); } @@ -195,14 +229,25 @@ public void testCreateSplitIndexToN() { assertHitCount(client().prepareSearch("second_split").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs); assertHitCount(client().prepareSearch("first_split").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs); assertHitCount(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs); - + if (useNested) { + assertNested("source", numDocs); + assertNested("first_split", numDocs); + assertNested("second_split", numDocs); + } assertAllUniqueDocs(client().prepareSearch("second_split").setSize(100) .setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs); assertAllUniqueDocs(client().prepareSearch("first_split").setSize(100) .setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs); assertAllUniqueDocs(client().prepareSearch("source").setSize(100) .setQuery(new TermsQueryBuilder("foo", "bar")).get(), numDocs); + } + public void assertNested(String index, int numDocs) { + // now, do a nested query + SearchResponse searchResponse = client().prepareSearch(index).setQuery(nestedQuery("nested1", termQuery("nested1.n_field1", + "n_value1_1"), ScoreMode.Avg)).get(); + assertNoFailures(searchResponse); + assertThat(searchResponse.getHits().getTotalHits(), equalTo((long)numDocs)); } public void assertAllUniqueDocs(SearchResponse response, int numDocs) { diff --git a/core/src/test/java/org/elasticsearch/index/shard/ShardSplittingQueryTests.java b/core/src/test/java/org/elasticsearch/index/shard/ShardSplittingQueryTests.java index 7351372620fc9..c0b492b0cb67d 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/ShardSplittingQueryTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/ShardSplittingQueryTests.java @@ -23,6 +23,7 @@ import org.apache.lucene.document.StringField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexableField; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.RandomIndexWriter; import org.apache.lucene.index.SortedNumericDocValues; @@ -38,10 +39,12 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.RoutingFieldMapper; +import org.elasticsearch.index.mapper.TypeFieldMapper; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.test.ESTestCase; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -58,18 +61,36 @@ public void testSplitOnID() throws IOException { .setRoutingNumShards(numShards * 1000000) .numberOfReplicas(0).build(); int targetShardId = randomIntBetween(0, numShards-1); + boolean hasNested = randomBoolean(); for (int j = 0; j < numDocs; j++) { int shardId = OperationRouting.generateShardId(metaData, Integer.toString(j), null); - writer.addDocument(Arrays.asList( - new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), - new SortedNumericDocValuesField("shard_id", shardId) - )); + if (hasNested) { + List> docs = new ArrayList<>(); + int numNested = randomIntBetween(0, 10); + for (int i = 0; i < numNested; i++) { + docs.add(Arrays.asList( + new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), + new StringField(TypeFieldMapper.NAME, "__nested", Field.Store.YES), + new SortedNumericDocValuesField("shard_id", shardId) + )); + } + docs.add(Arrays.asList( + new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), + new SortedNumericDocValuesField("shard_id", shardId) + )); + writer.addDocuments(docs); + } else { + writer.addDocument(Arrays.asList( + new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), + new SortedNumericDocValuesField("shard_id", shardId) + )); + } } writer.commit(); writer.close(); - assertSplit(dir, metaData, targetShardId); + assertSplit(dir, metaData, targetShardId, hasNested); dir.close(); } @@ -83,19 +104,38 @@ public void testSplitOnRouting() throws IOException { .numberOfShards(numShards) .setRoutingNumShards(numShards * 1000000) .numberOfReplicas(0).build(); + boolean hasNested = randomBoolean(); int targetShardId = randomIntBetween(0, numShards-1); for (int j = 0; j < numDocs; j++) { String routing = randomRealisticUnicodeOfCodepointLengthBetween(1, 5); final int shardId = OperationRouting.generateShardId(metaData, null, routing); - writer.addDocument(Arrays.asList( - new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), - new StringField(RoutingFieldMapper.NAME, routing, Field.Store.YES), - new SortedNumericDocValuesField("shard_id", shardId) - )); + if (hasNested) { + List> docs = new ArrayList<>(); + int numNested = randomIntBetween(0, 10); + for (int i = 0; i < numNested; i++) { + docs.add(Arrays.asList( + new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), + new StringField(TypeFieldMapper.NAME, "__nested", Field.Store.YES), + new SortedNumericDocValuesField("shard_id", shardId) + )); + } + docs.add(Arrays.asList( + new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), + new StringField(RoutingFieldMapper.NAME, routing, Field.Store.YES), + new SortedNumericDocValuesField("shard_id", shardId) + )); + writer.addDocuments(docs); + } else { + writer.addDocument(Arrays.asList( + new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), + new StringField(RoutingFieldMapper.NAME, routing, Field.Store.YES), + new SortedNumericDocValuesField("shard_id", shardId) + )); + } } writer.commit(); writer.close(); - assertSplit(dir, metaData, targetShardId); + assertSplit(dir, metaData, targetShardId, hasNested); dir.close(); } @@ -103,33 +143,52 @@ public void testSplitOnIdOrRouting() throws IOException { Directory dir = newFSDirectory(createTempDir()); final int numDocs = randomIntBetween(50, 100); RandomIndexWriter writer = new RandomIndexWriter(random(), dir); - int numShards = randomIntBetween(2, 10); + int numShards = randomIntBetween(2, 10); IndexMetaData metaData = IndexMetaData.builder("test") .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) .numberOfShards(numShards) .setRoutingNumShards(numShards * 1000000) .numberOfReplicas(0).build(); + boolean hasNested = randomBoolean(); int targetShardId = randomIntBetween(0, numShards-1); for (int j = 0; j < numDocs; j++) { + Iterable rootDoc; + final int shardId; if (randomBoolean()) { String routing = randomRealisticUnicodeOfCodepointLengthBetween(1, 5); - final int shardId = OperationRouting.generateShardId(metaData, null, routing); - writer.addDocument(Arrays.asList( + shardId = OperationRouting.generateShardId(metaData, null, routing); + rootDoc = Arrays.asList( new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), new StringField(RoutingFieldMapper.NAME, routing, Field.Store.YES), new SortedNumericDocValuesField("shard_id", shardId) - )); + ); } else { - int shardId = OperationRouting.generateShardId(metaData, Integer.toString(j), null); - writer.addDocument(Arrays.asList( + shardId = OperationRouting.generateShardId(metaData, Integer.toString(j), null); + rootDoc = Arrays.asList( new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), new SortedNumericDocValuesField("shard_id", shardId) - )); + ); + } + + if (hasNested) { + List> docs = new ArrayList<>(); + int numNested = randomIntBetween(0, 10); + for (int i = 0; i < numNested; i++) { + docs.add(Arrays.asList( + new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), + new StringField(TypeFieldMapper.NAME, "__nested", Field.Store.YES), + new SortedNumericDocValuesField("shard_id", shardId) + )); + } + docs.add(rootDoc); + writer.addDocuments(docs); + } else { + writer.addDocument(rootDoc); } } writer.commit(); writer.close(); - assertSplit(dir, metaData, targetShardId); + assertSplit(dir, metaData, targetShardId, hasNested); dir.close(); } @@ -145,47 +204,94 @@ public void testSplitOnRoutingPartitioned() throws IOException { .setRoutingNumShards(numShards * 1000000) .routingPartitionSize(randomIntBetween(1, 10)) .numberOfReplicas(0).build(); + boolean hasNested = randomBoolean(); int targetShardId = randomIntBetween(0, numShards-1); for (int j = 0; j < numDocs; j++) { String routing = randomRealisticUnicodeOfCodepointLengthBetween(1, 5); final int shardId = OperationRouting.generateShardId(metaData, Integer.toString(j), routing); - writer.addDocument(Arrays.asList( - new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), - new StringField(RoutingFieldMapper.NAME, routing, Field.Store.YES), - new SortedNumericDocValuesField("shard_id", shardId) - )); + + if (hasNested) { + List> docs = new ArrayList<>(); + int numNested = randomIntBetween(0, 10); + for (int i = 0; i < numNested; i++) { + docs.add(Arrays.asList( + new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), + new StringField(TypeFieldMapper.NAME, "__nested", Field.Store.YES), + new SortedNumericDocValuesField("shard_id", shardId) + )); + } + docs.add(Arrays.asList( + new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), + new StringField(RoutingFieldMapper.NAME, routing, Field.Store.YES), + new SortedNumericDocValuesField("shard_id", shardId) + )); + writer.addDocuments(docs); + } else { + writer.addDocument(Arrays.asList( + new StringField(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(j)), Field.Store.YES), + new StringField(RoutingFieldMapper.NAME, routing, Field.Store.YES), + new SortedNumericDocValuesField("shard_id", shardId) + )); + } } writer.commit(); writer.close(); - assertSplit(dir, metaData, targetShardId); + assertSplit(dir, metaData, targetShardId, hasNested); dir.close(); } - void assertSplit(Directory dir, IndexMetaData metaData, int targetShardId) throws IOException { + void assertSplit(Directory dir, IndexMetaData metaData, int targetShardId, boolean hasNested) throws IOException { try (IndexReader reader = DirectoryReader.open(dir)) { IndexSearcher searcher = new IndexSearcher(reader); searcher.setQueryCache(null); final boolean needsScores = false; - final Weight splitWeight = searcher.createNormalizedWeight(new ShardSplittingQuery(metaData, targetShardId), needsScores); + final Weight splitWeight = searcher.createNormalizedWeight(new ShardSplittingQuery(metaData, targetShardId, hasNested), + needsScores); final List leaves = reader.leaves(); for (final LeafReaderContext ctx : leaves) { Scorer scorer = splitWeight.scorer(ctx); DocIdSetIterator iterator = scorer.iterator(); SortedNumericDocValues shard_id = ctx.reader().getSortedNumericDocValues("shard_id"); - int doc; - while ((doc = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { - while (shard_id.nextDoc() < doc) { + int numExpected = 0; + while (shard_id.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + if (targetShardId == shard_id.nextValue()) { + numExpected++; + } + } + if (numExpected == ctx.reader().maxDoc()) { + // all docs belong in this shard + assertEquals(DocIdSetIterator.NO_MORE_DOCS, iterator.nextDoc()); + } else { + shard_id = ctx.reader().getSortedNumericDocValues("shard_id"); + int doc; + int numActual = 0; + int lastDoc = 0; + while ((doc = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + lastDoc = doc; + while (shard_id.nextDoc() < doc) { + long shardID = shard_id.nextValue(); + assertEquals(shardID, targetShardId); + numActual++; + } + assertEquals(shard_id.docID(), doc); long shardID = shard_id.nextValue(); - assertEquals(shardID, targetShardId); + BytesRef id = reader.document(doc).getBinaryValue("_id"); + String actualId = Uid.decodeId(id.bytes, id.offset, id.length); + assertNotEquals(ctx.reader() + " docID: " + doc + " actualID: " + actualId, shardID, targetShardId); + } + if (lastDoc < ctx.reader().maxDoc()) { + // check the last docs in the segment and make sure they all have the right shard id + while (shard_id.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + long shardID = shard_id.nextValue(); + assertEquals(shardID, targetShardId); + numActual++; + } } - assertEquals(shard_id.docID(), doc); - long shardID = shard_id.nextValue(); - BytesRef id = reader.document(doc).getBinaryValue("_id"); - String actualId = Uid.decodeId(id.bytes, id.offset, id.length); - assertNotEquals(ctx.reader() + " docID: " + doc + " actualID: " + actualId, shardID, targetShardId); + + assertEquals(numExpected, numActual); } } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/StoreRecoveryTests.java b/core/src/test/java/org/elasticsearch/index/shard/StoreRecoveryTests.java index 05b092ff3a461..61d6a5a845bf8 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/StoreRecoveryTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/StoreRecoveryTests.java @@ -100,7 +100,7 @@ public void testAddIndices() throws IOException { Directory target = newFSDirectory(createTempDir()); final long maxSeqNo = randomNonNegativeLong(); final long maxUnsafeAutoIdTimestamp = randomNonNegativeLong(); - storeRecovery.addIndices(indexStats, target, indexSort, dirs, maxSeqNo, maxUnsafeAutoIdTimestamp, null, 0, false); + storeRecovery.addIndices(indexStats, target, indexSort, dirs, maxSeqNo, maxUnsafeAutoIdTimestamp, null, 0, false, false); int numFiles = 0; Predicate filesFilter = (f) -> f.startsWith("segments") == false && f.equals("write.lock") == false && f.startsWith("extra") == false; @@ -174,7 +174,7 @@ public void testSplitShard() throws IOException { .setRoutingNumShards(numShards * 1000000) .numberOfReplicas(0).build(); storeRecovery.addIndices(indexStats, target, indexSort, new Directory[] {dir}, maxSeqNo, maxUnsafeAutoIdTimestamp, metaData, - targetShardId, true); + targetShardId, true, false); SegmentInfos segmentCommitInfos = SegmentInfos.readLatestCommit(target);