-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix ShardSplittingQuery
to respect nested documents.
#27398
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,9 +19,11 @@ | |
package org.elasticsearch.index.shard; | ||
|
||
import org.apache.lucene.index.FieldInfo; | ||
import org.apache.lucene.index.IndexReaderContext; | ||
import org.apache.lucene.index.LeafReader; | ||
import org.apache.lucene.index.LeafReaderContext; | ||
import org.apache.lucene.index.PostingsEnum; | ||
import org.apache.lucene.index.ReaderUtil; | ||
import org.apache.lucene.index.StoredFieldVisitor; | ||
import org.apache.lucene.index.Terms; | ||
import org.apache.lucene.index.TermsEnum; | ||
|
@@ -33,19 +35,22 @@ | |
import org.apache.lucene.search.Scorer; | ||
import org.apache.lucene.search.TwoPhaseIterator; | ||
import org.apache.lucene.search.Weight; | ||
import org.apache.lucene.search.join.BitSetProducer; | ||
import org.apache.lucene.util.BitSet; | ||
import org.apache.lucene.util.BitSetIterator; | ||
import org.apache.lucene.util.Bits; | ||
import org.apache.lucene.util.BytesRef; | ||
import org.apache.lucene.util.FixedBitSet; | ||
import org.elasticsearch.Version; | ||
import org.elasticsearch.cluster.metadata.IndexMetaData; | ||
import org.elasticsearch.cluster.routing.OperationRouting; | ||
import org.elasticsearch.common.lucene.search.Queries; | ||
import org.elasticsearch.index.mapper.IdFieldMapper; | ||
import org.elasticsearch.index.mapper.RoutingFieldMapper; | ||
import org.elasticsearch.index.mapper.Uid; | ||
|
||
import java.io.IOException; | ||
import java.util.function.IntConsumer; | ||
import java.util.function.IntPredicate; | ||
import java.util.function.Predicate; | ||
|
||
/** | ||
|
@@ -56,16 +61,21 @@ | |
final class ShardSplittingQuery extends Query { | ||
private final IndexMetaData indexMetaData; | ||
private final int shardId; | ||
private final BitSetProducer nestedParentBitSetProducer; | ||
|
||
ShardSplittingQuery(IndexMetaData indexMetaData, int shardId) { | ||
ShardSplittingQuery(IndexMetaData indexMetaData, int shardId, boolean hasNested) { | ||
this(indexMetaData, shardId, hasNested ? newParentDocBitSetProducer() : null); | ||
} | ||
|
||
private ShardSplittingQuery(IndexMetaData indexMetaData, int shardId, BitSetProducer nestedParentBitSetProducer) { | ||
if (indexMetaData.getCreationVersion().before(Version.V_6_0_0_rc2)) { | ||
throw new IllegalArgumentException("Splitting query can only be executed on an index created with version " | ||
+ Version.V_6_0_0_rc2 + " or higher"); | ||
} | ||
this.indexMetaData = indexMetaData; | ||
this.shardId = shardId; | ||
this.nestedParentBitSetProducer = nestedParentBitSetProducer; | ||
} | ||
|
||
@Override | ||
public Weight createWeight(IndexSearcher searcher, boolean needsScores, float boost) { | ||
return new ConstantScoreWeight(this, boost) { | ||
|
@@ -84,44 +94,85 @@ public Scorer scorer(LeafReaderContext context) throws IOException { | |
Uid.decodeId(ref.bytes, ref.offset, ref.length), null); | ||
return shardId == targetShardId; | ||
}; | ||
if (terms == null) { // this is the common case - no partitioning and no _routing values | ||
if (terms == null) { | ||
// this is the common case - no partitioning and no _routing values | ||
// in this case we also don't do anything special with regards to nested docs since we basically delete | ||
// by ID and parent and nested all have the same id. | ||
assert indexMetaData.isRoutingPartitionedIndex() == false; | ||
findSplitDocs(IdFieldMapper.NAME, includeInShard, leafReader, bitSet::set); | ||
} else { | ||
final BitSet parentBitSet; | ||
final IntPredicate includeDoc; | ||
if (nestedParentBitSetProducer == null) { | ||
parentBitSet = null; | ||
includeDoc = i -> true; | ||
} else { | ||
parentBitSet = nestedParentBitSetProducer.getBitSet(context); | ||
if (parentBitSet == null) { | ||
return null; // no matches | ||
} | ||
includeDoc = parentBitSet::get; | ||
} | ||
if (indexMetaData.isRoutingPartitionedIndex()) { | ||
// this is the heaviest invariant. Here we have to visit all docs stored fields do extract _id and _routing | ||
// this this index is routing partitioned. | ||
Visitor visitor = new Visitor(); | ||
return new ConstantScoreScorer(this, score(), | ||
new RoutingPartitionedDocIdSetIterator(leafReader, visitor)); | ||
TwoPhaseIterator twoPhaseIterator = | ||
parentBitSet == null ? new RoutingPartitionedDocIdSetIterator(leafReader, visitor) : | ||
new NestedRoutingPartitionedDocIdSetIterator(leafReader, visitor, parentBitSet); | ||
return new ConstantScoreScorer(this, score(), twoPhaseIterator); | ||
} else { | ||
// in the _routing case we first go and find all docs that have a routing value and mark the ones we have to delete | ||
findSplitDocs(RoutingFieldMapper.NAME, ref -> { | ||
int targetShardId = OperationRouting.generateShardId(indexMetaData, null, ref.utf8ToString()); | ||
return shardId == targetShardId; | ||
}, leafReader, bitSet::set); | ||
}, leafReader, docId -> { | ||
if (includeDoc.test(docId)) { | ||
bitSet.set(docId); | ||
} | ||
}); | ||
|
||
// now if we have a mixed index where some docs have a _routing value and some don't we have to exclude the ones | ||
// with a routing value from the next iteration an delete / select based on the ID. | ||
if (terms.getDocCount() != leafReader.maxDoc()) { | ||
// this is a special case where some of the docs have no routing values this sucks but it's possible today | ||
FixedBitSet hasRoutingValue = new FixedBitSet(leafReader.maxDoc()); | ||
findSplitDocs(RoutingFieldMapper.NAME, ref -> false, leafReader, | ||
hasRoutingValue::set); | ||
docId -> { | ||
if (includeDoc.test(docId)) { | ||
hasRoutingValue.set(docId); | ||
} | ||
}); | ||
|
||
findSplitDocs(IdFieldMapper.NAME, includeInShard, leafReader, docId -> { | ||
if (hasRoutingValue.get(docId) == false) { | ||
if (hasRoutingValue.get(docId) == false && includeDoc.test(docId)) { | ||
bitSet.set(docId); | ||
} | ||
}); | ||
} | ||
} | ||
if (parentBitSet != null) { | ||
markChildDocs(parentBitSet, bitSet); | ||
} | ||
} | ||
|
||
return new ConstantScoreScorer(this, score(), new BitSetIterator(bitSet, bitSet.length())); | ||
} | ||
|
||
|
||
}; | ||
} | ||
|
||
private void markChildDocs(BitSet parentDocs, BitSet deletedDocs) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why deletedDocs ? It's the opposite, no ? The bitset of the matching parent+children, allDocs ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. naming issue, I will fix. In theory it will hold all docs that need to be deleted by the IndexWriter. |
||
int currentDeleted = 0; | ||
while (currentDeleted < deletedDocs.length() && | ||
(currentDeleted = deletedDocs.nextSetBit(currentDeleted)) != DocIdSetIterator.NO_MORE_DOCS) { | ||
int previousParent = parentDocs.prevSetBit(Math.max(0, currentDeleted-1)); | ||
for (int i = previousParent + 1; i < currentDeleted; i++) { | ||
deletedDocs.set(i); | ||
} | ||
currentDeleted++; | ||
} | ||
} | ||
|
||
@Override | ||
public String toString(String field) { | ||
return "shard_splitting_query"; | ||
|
@@ -216,7 +267,7 @@ public Status needsField(FieldInfo fieldInfo) throws IOException { | |
* This two phase iterator visits every live doc and selects all docs that don't belong into this | ||
* shard based on their id and routing value. This is only used in a routing partitioned index. | ||
*/ | ||
private final class RoutingPartitionedDocIdSetIterator extends TwoPhaseIterator { | ||
private class RoutingPartitionedDocIdSetIterator extends TwoPhaseIterator { | ||
private final LeafReader leafReader; | ||
private final Visitor visitor; | ||
|
||
|
@@ -228,7 +279,10 @@ private final class RoutingPartitionedDocIdSetIterator extends TwoPhaseIterator | |
|
||
@Override | ||
public boolean matches() throws IOException { | ||
int doc = approximation.docID(); | ||
return innerMatches(approximation.docID()); | ||
} | ||
|
||
protected boolean innerMatches(int doc) throws IOException { | ||
visitor.reset(); | ||
leafReader.document(doc, visitor); | ||
int targetShardId = OperationRouting.generateShardId(indexMetaData, visitor.id, visitor.routing); | ||
|
@@ -240,6 +294,46 @@ public float matchCost() { | |
return 42; // that's obvious, right? | ||
} | ||
} | ||
|
||
/** | ||
* This TwoPhaseIterator marks all nested docs of matching parents as matches as well. | ||
*/ | ||
private final class NestedRoutingPartitionedDocIdSetIterator extends RoutingPartitionedDocIdSetIterator { | ||
private final BitSet parentDocs; | ||
private int nextParent = -1; | ||
private boolean nextParentMatches; | ||
|
||
NestedRoutingPartitionedDocIdSetIterator(LeafReader leafReader, Visitor visitor, BitSet parentDocs) { | ||
super(leafReader, visitor); | ||
this.parentDocs = parentDocs; | ||
} | ||
|
||
@Override | ||
public boolean matches() throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was trying to understand why this works, because of the forward iteration here (with nested, we usually seek backwards ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. correct, I will leave a comment |
||
int doc = approximation.docID(); | ||
if (doc > nextParent) { | ||
nextParent = parentDocs.nextSetBit(doc); | ||
nextParentMatches = innerMatches(nextParent); | ||
} | ||
return nextParentMatches; | ||
} | ||
} | ||
|
||
/* | ||
* this is used internally to obtain a bitset for parent documents. We don't cache this since we never access the same reader more | ||
* than once | ||
*/ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We warm this query per segment in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we use this only as a delete by query which is executed on a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I left a comment There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok I missed the recovery-private thing. Thanks |
||
private static BitSetProducer newParentDocBitSetProducer() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
return context -> { | ||
Query query = Queries.newNonNestedFilter(); | ||
final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context); | ||
final IndexSearcher searcher = new IndexSearcher(topLevelContext); | ||
searcher.setQueryCache(null); | ||
final Weight weight = searcher.createNormalizedWeight(query, false); | ||
Scorer s = weight.scorer(context); | ||
return s == null ? null : BitSet.of(s.iterator(), context.reader().maxDoc()); | ||
}; | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So just to double check, the
includeDoc
is to ensure that only root docs get selected and later we select the nested docs of the selected root docs inmarkChildDocs(...)
, right?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct