diff --git a/server/src/main/java/org/apache/lucene/index/ShuflleForcedMergePolicy.java b/server/src/main/java/org/apache/lucene/index/ShuflleForcedMergePolicy.java new file mode 100644 index 0000000000000..48ecbad189a5e --- /dev/null +++ b/server/src/main/java/org/apache/lucene/index/ShuflleForcedMergePolicy.java @@ -0,0 +1,119 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.lucene.index; + +import org.elasticsearch.common.lucene.Lucene; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A {@link FilterMergePolicy} that interleaves eldest and newest segments picked by {@link MergePolicy#findForcedMerges} + * and {@link MergePolicy#findForcedDeletesMerges}. This allows time-based indices, that usually have the eldest documents + * first, to be efficient at finding the most recent documents too. + */ +public class ShuflleForcedMergePolicy extends FilterMergePolicy { + private static final String SHUFFLE_MERGE_KEY = "es.shuffle_merge"; + + public ShuflleForcedMergePolicy(MergePolicy in) { + super(in); + } + + /** + * Return true if the provided reader was merged with interleaved segments. + */ + public static boolean isInterleavedSegment(LeafReader reader) { + SegmentReader segReader = Lucene.segmentReader(reader); + return segReader.getSegmentInfo().info.getDiagnostics().containsKey(SHUFFLE_MERGE_KEY); + } + + + @Override + public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException { + return wrap(in.findForcedDeletesMerges(segmentInfos, mergeContext)); + } + + @Override + public MergeSpecification findForcedMerges(SegmentInfos segmentInfos, int maxSegmentCount, + Map segmentsToMerge, + MergeContext mergeContext) throws IOException { + return wrap(in.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, mergeContext)); + } + + private MergeSpecification wrap(MergeSpecification mergeSpec) throws IOException { + if (mergeSpec == null) { + return null; + } + MergeSpecification newMergeSpec = new MergeSpecification(); + for (OneMerge toWrap : mergeSpec.merges) { + List newInfos = interleaveList(toWrap.segments); + newMergeSpec.add(new OneMerge(newInfos) { + @Override + public CodecReader wrapForMerge(CodecReader reader) throws IOException { + return toWrap.wrapForMerge(reader); + } + + @Override + public void setMergeInfo(SegmentCommitInfo info) { + // Record that this merged segment is current as of this schemaGen: + Map copy = new HashMap<>(info.info.getDiagnostics()); + copy.put(SHUFFLE_MERGE_KEY, ""); + info.info.setDiagnostics(copy); + super.setMergeInfo(info); + } + }); + } + + return newMergeSpec; + } + + // Return a new list that sort segments of the original one by name (older first) + // and then interleave them to colocate oldest and most recent segments together. + private List interleaveList(List infos) throws IOException { + List newInfos = new ArrayList<>(infos.size()); + Collections.sort(infos, Comparator.comparing(a -> a.info.name)); + int left = 0; + int right = infos.size() - 1; + while (left <= right) { + SegmentCommitInfo leftInfo = infos.get(left); + if (left == right) { + newInfos.add(infos.get(left)); + } else { + SegmentCommitInfo rightInfo = infos.get(right); + // smaller segment first + if (leftInfo.sizeInBytes() < rightInfo.sizeInBytes()) { + newInfos.add(leftInfo); + newInfos.add(rightInfo); + } else { + newInfos.add(rightInfo); + newInfos.add(leftInfo); + } + } + left ++; + right --; + } + return newInfos; + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1796d984c37ae..a5a4d4ef9dab2 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -36,6 +36,7 @@ import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.ShuflleForcedMergePolicy; import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; import org.apache.lucene.index.Term; import org.apache.lucene.search.DocIdSetIterator; @@ -2224,6 +2225,10 @@ private IndexWriterConfig getIndexWriterConfig() { new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETES_FIELD, softDeletesPolicy::getRetentionQuery, new PrunePostingsMergePolicy(mergePolicy, IdFieldMapper.NAME))); } + // We wrap the merge policy for all indices even though it is mostly useful for time-based indices + // but there should be no overhead for other type of indices so it's simpler than adding a setting + // to enable it. + mergePolicy = new ShuflleForcedMergePolicy(mergePolicy); iwc.setMergePolicy(new ElasticsearchMergePolicy(mergePolicy)); iwc.setSimilarity(engineConfig.getSimilarity()); iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac()); diff --git a/server/src/test/java/org/apache/lucene/index/ShuffleForcedMergePolicyTests.java b/server/src/test/java/org/apache/lucene/index/ShuffleForcedMergePolicyTests.java new file mode 100644 index 0000000000000..8033062f6d99b --- /dev/null +++ b/server/src/test/java/org/apache/lucene/index/ShuffleForcedMergePolicyTests.java @@ -0,0 +1,91 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.lucene.index; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.store.Directory; + +import java.io.IOException; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +public class ShuffleForcedMergePolicyTests extends BaseMergePolicyTestCase { + public void testDiagnostics() throws IOException { + try (Directory dir = newDirectory()) { + IndexWriterConfig iwc = newIndexWriterConfig(); + MergePolicy mp = new ShuflleForcedMergePolicy(newLogMergePolicy()); + iwc.setMergePolicy(mp); + boolean sorted = random().nextBoolean(); + if (sorted) { + iwc.setIndexSort(new Sort(new SortField("sort", SortField.Type.INT))); + } + int numDocs = atLeast(100); + + try (IndexWriter writer = new IndexWriter(dir, iwc)) { + for (int i = 0; i < numDocs; i++) { + if (i % 10 == 0) { + writer.flush(); + } + Document doc = new Document(); + doc.add(new StringField("id", "" + i, Field.Store.NO)); + doc.add(new NumericDocValuesField("sort", random().nextInt())); + writer.addDocument(doc); + } + try (DirectoryReader reader = DirectoryReader.open(writer)) { + assertThat(reader.leaves().size(), greaterThan(2)); + assertSegmentReaders(reader, leaf -> { + assertFalse(ShuflleForcedMergePolicy.isInterleavedSegment(leaf)); + }); + } + writer.forceMerge(1); + try (DirectoryReader reader = DirectoryReader.open(writer)) { + assertThat(reader.leaves().size(), equalTo(1)); + assertSegmentReaders(reader, leaf -> { + assertTrue(ShuflleForcedMergePolicy.isInterleavedSegment(leaf)); + }); + } + } + } + } + + private void assertSegmentReaders(DirectoryReader reader, Consumer checkLeaf) { + for (LeafReaderContext leaf : reader.leaves()) { + checkLeaf.accept(leaf.reader()); + } + } + + @Override + protected MergePolicy mergePolicy() { + return new ShuflleForcedMergePolicy(newLogMergePolicy()); + } + + @Override + protected void assertSegmentInfos(MergePolicy policy, SegmentInfos infos) throws IOException {} + + @Override + protected void assertMerge(MergePolicy policy, MergePolicy.MergeSpecification merge) throws IOException {} +} diff --git a/server/src/test/java/org/elasticsearch/index/engine/PrunePostingsMergePolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/PrunePostingsMergePolicyTests.java index 9fb9445181327..0e0b4dbcacf1c 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/PrunePostingsMergePolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/PrunePostingsMergePolicyTests.java @@ -29,6 +29,7 @@ import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.ShuflleForcedMergePolicy; import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; import org.apache.lucene.index.Term; import org.apache.lucene.index.Terms; @@ -51,7 +52,7 @@ public void testPrune() throws IOException { iwc.setSoftDeletesField("_soft_deletes"); MergePolicy mp = new SoftDeletesRetentionMergePolicy("_soft_deletes", MatchAllDocsQuery::new, new PrunePostingsMergePolicy(newLogMergePolicy(), "id")); - iwc.setMergePolicy(mp); + iwc.setMergePolicy(new ShuflleForcedMergePolicy(mp)); boolean sorted = randomBoolean(); if (sorted) { iwc.setIndexSort(new Sort(new SortField("sort", SortField.Type.INT))); diff --git a/server/src/test/java/org/elasticsearch/index/engine/RecoverySourcePruneMergePolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/RecoverySourcePruneMergePolicyTests.java index ef895e1a4ce8e..bd48f19a6365d 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/RecoverySourcePruneMergePolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/RecoverySourcePruneMergePolicyTests.java @@ -34,6 +34,7 @@ import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.ShuflleForcedMergePolicy; import org.apache.lucene.index.StandardDirectoryReader; import org.apache.lucene.index.Term; import org.apache.lucene.search.DocIdSetIterator; @@ -57,7 +58,7 @@ public void testPruneAll() throws IOException { IndexWriterConfig iwc = newIndexWriterConfig(); RecoverySourcePruneMergePolicy mp = new RecoverySourcePruneMergePolicy("extra_source", MatchNoDocsQuery::new, newLogMergePolicy()); - iwc.setMergePolicy(mp); + iwc.setMergePolicy(new ShuflleForcedMergePolicy(mp)); try (IndexWriter writer = new IndexWriter(dir, iwc)) { for (int i = 0; i < 20; i++) { if (i > 0 && randomBoolean()) {