Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ShardSplittingQuery to respect nested documents. #27398

Merged
merged 4 commits into from
Nov 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
package org.elasticsearch.index.shard;

import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.IndexReaderContext;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PostingsEnum;
import org.apache.lucene.index.ReaderUtil;
import org.apache.lucene.index.StoredFieldVisitor;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
Expand All @@ -33,19 +35,23 @@
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.TwoPhaseIterator;
import org.apache.lucene.search.Weight;
import org.apache.lucene.search.join.BitSetProducer;
import org.apache.lucene.util.BitSet;
import org.apache.lucene.util.BitSetIterator;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.RoutingFieldMapper;
import org.elasticsearch.index.mapper.Uid;

import java.io.IOException;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.IntPredicate;
import java.util.function.Predicate;

/**
Expand All @@ -56,16 +62,17 @@
final class ShardSplittingQuery extends Query {
private final IndexMetaData indexMetaData;
private final int shardId;
private final BitSetProducer nestedParentBitSetProducer;

ShardSplittingQuery(IndexMetaData indexMetaData, int shardId) {
ShardSplittingQuery(IndexMetaData indexMetaData, int shardId, boolean hasNested) {
if (indexMetaData.getCreationVersion().before(Version.V_6_0_0_rc2)) {
throw new IllegalArgumentException("Splitting query can only be executed on an index created with version "
+ Version.V_6_0_0_rc2 + " or higher");
}
this.indexMetaData = indexMetaData;
this.shardId = shardId;
this.nestedParentBitSetProducer = hasNested ? newParentDocBitSetProducer() : null;
}

@Override
public Weight createWeight(IndexSearcher searcher, boolean needsScores, float boost) {
return new ConstantScoreWeight(this, boost) {
Expand All @@ -84,44 +91,87 @@ public Scorer scorer(LeafReaderContext context) throws IOException {
Uid.decodeId(ref.bytes, ref.offset, ref.length), null);
return shardId == targetShardId;
};
if (terms == null) { // this is the common case - no partitioning and no _routing values
if (terms == null) {
// this is the common case - no partitioning and no _routing values
// in this case we also don't do anything special with regards to nested docs since we basically delete
// by ID and parent and nested all have the same id.
assert indexMetaData.isRoutingPartitionedIndex() == false;
findSplitDocs(IdFieldMapper.NAME, includeInShard, leafReader, bitSet::set);
} else {
final BitSet parentBitSet;
if (nestedParentBitSetProducer == null) {
parentBitSet = null;
} else {
parentBitSet = nestedParentBitSetProducer.getBitSet(context);
if (parentBitSet == null) {
return null; // no matches
}
}
if (indexMetaData.isRoutingPartitionedIndex()) {
// this is the heaviest invariant. Here we have to visit all docs stored fields do extract _id and _routing
// this this index is routing partitioned.
Visitor visitor = new Visitor();
return new ConstantScoreScorer(this, score(),
new RoutingPartitionedDocIdSetIterator(leafReader, visitor));
Visitor visitor = new Visitor(leafReader);
TwoPhaseIterator twoPhaseIterator =
parentBitSet == null ? new RoutingPartitionedDocIdSetIterator(visitor) :
new NestedRoutingPartitionedDocIdSetIterator(visitor, parentBitSet);
return new ConstantScoreScorer(this, score(), twoPhaseIterator);
} else {
// here we potentially guard the docID consumers with our parent bitset if we have one.
// this ensures that we are only marking root documents in the nested case and if necessary
// we do a second pass to mark the corresponding children in markChildDocs
Function<IntConsumer, IntConsumer> maybeWrapConsumer = consumer -> {
if (parentBitSet != null) {
return docId -> {
if (parentBitSet.get(docId)) {
consumer.accept(docId);
}
};
}
return consumer;
};
// in the _routing case we first go and find all docs that have a routing value and mark the ones we have to delete
findSplitDocs(RoutingFieldMapper.NAME, ref -> {
int targetShardId = OperationRouting.generateShardId(indexMetaData, null, ref.utf8ToString());
return shardId == targetShardId;
}, leafReader, bitSet::set);
}, leafReader, maybeWrapConsumer.apply(bitSet::set));

// now if we have a mixed index where some docs have a _routing value and some don't we have to exclude the ones
// with a routing value from the next iteration an delete / select based on the ID.
if (terms.getDocCount() != leafReader.maxDoc()) {
// this is a special case where some of the docs have no routing values this sucks but it's possible today
FixedBitSet hasRoutingValue = new FixedBitSet(leafReader.maxDoc());
findSplitDocs(RoutingFieldMapper.NAME, ref -> false, leafReader,
hasRoutingValue::set);
findSplitDocs(RoutingFieldMapper.NAME, ref -> false, leafReader, maybeWrapConsumer.apply(hasRoutingValue::set));
IntConsumer bitSetConsumer = maybeWrapConsumer.apply(bitSet::set);
findSplitDocs(IdFieldMapper.NAME, includeInShard, leafReader, docId -> {
if (hasRoutingValue.get(docId) == false) {
bitSet.set(docId);
bitSetConsumer.accept(docId);
}
});
}
}
if (parentBitSet != null) {
// if nested docs are involved we also need to mark all child docs that belong to a matching parent doc.
markChildDocs(parentBitSet, bitSet);
}
}

return new ConstantScoreScorer(this, score(), new BitSetIterator(bitSet, bitSet.length()));
}


};
}

private void markChildDocs(BitSet parentDocs, BitSet matchingDocs) {
int currentDeleted = 0;
while (currentDeleted < matchingDocs.length() &&
(currentDeleted = matchingDocs.nextSetBit(currentDeleted)) != DocIdSetIterator.NO_MORE_DOCS) {
int previousParent = parentDocs.prevSetBit(Math.max(0, currentDeleted-1));
for (int i = previousParent + 1; i < currentDeleted; i++) {
matchingDocs.set(i);
}
currentDeleted++;
}
}

@Override
public String toString(String field) {
return "shard_splitting_query";
Expand All @@ -145,8 +195,8 @@ public int hashCode() {
return classHash() ^ result;
}

private static void findSplitDocs(String idField, Predicate<BytesRef> includeInShard,
LeafReader leafReader, IntConsumer consumer) throws IOException {
private static void findSplitDocs(String idField, Predicate<BytesRef> includeInShard, LeafReader leafReader,
IntConsumer consumer) throws IOException {
Terms terms = leafReader.terms(idField);
TermsEnum iterator = terms.iterator();
BytesRef idTerm;
Expand All @@ -162,15 +212,17 @@ private static void findSplitDocs(String idField, Predicate<BytesRef> includeInS
}
}

private static final class Visitor extends StoredFieldVisitor {
int leftToVisit = 2;
final BytesRef spare = new BytesRef();
String routing;
String id;
/* this class is a stored fields visitor that reads _id and/or _routing from the stored fields which is necessary in the case
of a routing partitioned index sine otherwise we would need to un-invert the _id and _routing field which is memory heavy */
private final class Visitor extends StoredFieldVisitor {
final LeafReader leafReader;
private int leftToVisit = 2;
private final BytesRef spare = new BytesRef();
private String routing;
private String id;

void reset() {
routing = id = null;
leftToVisit = 2;
Visitor(LeafReader leafReader) {
this.leafReader = leafReader;
}

@Override
Expand Down Expand Up @@ -210,36 +262,91 @@ public Status needsField(FieldInfo fieldInfo) throws IOException {
return leftToVisit == 0 ? Status.STOP : Status.NO;
}
}

boolean matches(int doc) throws IOException {
routing = id = null;
leftToVisit = 2;
leafReader.document(doc, this);
assert id != null : "docID must not be null - we might have hit a nested document";
int targetShardId = OperationRouting.generateShardId(indexMetaData, id, routing);
return targetShardId != shardId;
}
}

/**
* This two phase iterator visits every live doc and selects all docs that don't belong into this
* shard based on their id and routing value. This is only used in a routing partitioned index.
*/
private final class RoutingPartitionedDocIdSetIterator extends TwoPhaseIterator {
private final LeafReader leafReader;
private static final class RoutingPartitionedDocIdSetIterator extends TwoPhaseIterator {
private final Visitor visitor;

RoutingPartitionedDocIdSetIterator(LeafReader leafReader, Visitor visitor) {
super(DocIdSetIterator.all(leafReader.maxDoc())); // we iterate all live-docs
this.leafReader = leafReader;
RoutingPartitionedDocIdSetIterator(Visitor visitor) {
super(DocIdSetIterator.all(visitor.leafReader.maxDoc())); // we iterate all live-docs
this.visitor = visitor;
}

@Override
public boolean matches() throws IOException {
return visitor.matches(approximation.docID());
}

@Override
public float matchCost() {
return 42; // that's obvious, right?
}
}

/**
* This TwoPhaseIterator marks all nested docs of matching parents as matches as well.
*/
private static final class NestedRoutingPartitionedDocIdSetIterator extends TwoPhaseIterator {
private final Visitor visitor;
private final BitSet parentDocs;
private int nextParent = -1;
private boolean nextParentMatches;

NestedRoutingPartitionedDocIdSetIterator(Visitor visitor, BitSet parentDocs) {
super(DocIdSetIterator.all(visitor.leafReader.maxDoc())); // we iterate all live-docs
this.parentDocs = parentDocs;
this.visitor = visitor;
}

@Override
public boolean matches() throws IOException {
// the educated reader might ask why this works, it does because all live doc ids (root docs and nested docs) are evaluated in
// order and that way we don't need to seek backwards as we do in other nested docs cases.
int doc = approximation.docID();
visitor.reset();
leafReader.document(doc, visitor);
int targetShardId = OperationRouting.generateShardId(indexMetaData, visitor.id, visitor.routing);
return targetShardId != shardId;
if (doc > nextParent) {
// we only check once per nested/parent set
nextParent = parentDocs.nextSetBit(doc);
// never check a child document against the visitor, they neihter have _id nor _routing as stored fields
nextParentMatches = visitor.matches(nextParent);
}
return nextParentMatches;
}

@Override
public float matchCost() {
return 42; // that's obvious, right?
}
}

/*
* this is used internally to obtain a bitset for parent documents. We don't cache this since we never access the same reader more
* than once. There is no point in using BitsetFilterCache#BitSetProducerWarmer since we use this only as a delete by query which is
* executed on a recovery-private index writer. There is no point in caching it and it won't have a cache hit either.
*/
private static BitSetProducer newParentDocBitSetProducer() {
return context -> {
Query query = Queries.newNonNestedFilter();
final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context);
final IndexSearcher searcher = new IndexSearcher(topLevelContext);
searcher.setQueryCache(null);
final Weight weight = searcher.createNormalizedWeight(query, false);
Scorer s = weight.scorer(context);
return s == null ? null : BitSet.of(s.iterator(), context.reader().maxDoc());
};
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdate
indexShard.mapperService().merge(sourceMetaData, MapperService.MergeReason.MAPPING_RECOVERY, true);
// now that the mapping is merged we can validate the index sort configuration.
Sort indexSort = indexShard.getIndexSort();
final boolean hasNested = indexShard.mapperService().hasNested();
final boolean isSplit = sourceMetaData.getNumberOfShards() < indexShard.indexSettings().getNumberOfShards();
assert isSplit == false || sourceMetaData.getCreationVersion().onOrAfter(Version.V_6_0_0_alpha1) : "for split we require a " +
"single type but the index is created before 6.0.0";
Expand All @@ -127,7 +128,7 @@ boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdate
final long maxUnsafeAutoIdTimestamp =
shards.stream().mapToLong(LocalShardSnapshot::maxUnsafeAutoIdTimestamp).max().getAsLong();
addIndices(indexShard.recoveryState().getIndex(), directory, indexSort, sources, maxSeqNo, maxUnsafeAutoIdTimestamp,
indexShard.indexSettings().getIndexMetaData(), indexShard.shardId().id(), isSplit);
indexShard.indexSettings().getIndexMetaData(), indexShard.shardId().id(), isSplit, hasNested);
internalRecoverFromStore(indexShard);
// just trigger a merge to do housekeeping on the
// copied segments - we will also see them in stats etc.
Expand All @@ -142,8 +143,8 @@ boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdate
}

void addIndices(final RecoveryState.Index indexRecoveryStats, final Directory target, final Sort indexSort, final Directory[] sources,
final long maxSeqNo, final long maxUnsafeAutoIdTimestamp, IndexMetaData indexMetaData, int shardId, boolean split)
throws IOException {
final long maxSeqNo, final long maxUnsafeAutoIdTimestamp, IndexMetaData indexMetaData, int shardId, boolean split,
boolean hasNested) throws IOException {
final Directory hardLinkOrCopyTarget = new org.apache.lucene.store.HardlinkCopyDirectoryWrapper(target);
IndexWriterConfig iwc = new IndexWriterConfig(null)
.setCommitOnClose(false)
Expand All @@ -158,9 +159,8 @@ void addIndices(final RecoveryState.Index indexRecoveryStats, final Directory ta

try (IndexWriter writer = new IndexWriter(new StatsDirectoryWrapper(hardLinkOrCopyTarget, indexRecoveryStats), iwc)) {
writer.addIndexes(sources);

if (split) {
writer.deleteDocuments(new ShardSplittingQuery(indexMetaData, shardId));
writer.deleteDocuments(new ShardSplittingQuery(indexMetaData, shardId, hasNested));
}
/*
* We set the maximum sequence number and the local checkpoint on the target to the maximum of the maximum sequence numbers on
Expand Down
Loading