-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add prefetching support to stored fields. #13424
Merged
Merged
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This adds `StoredFields#prefetch(int)`, which mostly delegates to `IndexInput#prefetch`. Callers can take advantage of this API to parallelize I/O across multiple stored documents by first calling `StoredFields#prefetch` on all doc IDs before calling `StoredFields#document` on all doc IDs. I added a cache of recently prefetched blocks to the default codec, in order to avoid prefetching the same block multiple times in a short period of time. This felt sensible given that doc ID reordering via recursive graph bisection or index sorting are likely to result in search results being clustered.
9 tasks
Like for previous changes, I wrote a synthetic benchmark to make sure that this new API actually helps. This benchmark simulates fetching 20 random stored documents in parallel. The index it creates is 39GB while my page cache only has a capacity of 25GB.import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterMergePolicy;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.StoredFields;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
public class StoredFieldsPrefetchBench {
public static int DUMMY;
public static void main(String[] args) throws Exception {
Path dirPath = Paths.get(args[0]);
Directory dir = FSDirectory.open(dirPath);
if (DirectoryReader.indexExists(dir) == false) {
MergePolicy mergePolicy = new FilterMergePolicy(new TieredMergePolicy()) {
@Override
public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext)
throws IOException {
return false;
}
};
try (IndexWriter w = new IndexWriter(dir, new IndexWriterConfig().setUseCompoundFile(false).setMergePolicy(mergePolicy))) {
ExecutorService executor = Executors.newFixedThreadPool(4);
AtomicLong indexed = new AtomicLong(0);
for (int task = 0; task < 1000; ++task) {
executor.execute(() -> {
Random r = ThreadLocalRandom.current();
for (int i = 0; i < 40_000; ++i) {
Document doc = new Document();
byte[] bytes = new byte[1024];
r.nextBytes(bytes);
doc.add(new StoredField("content", bytes));
try {
w.addDocument(doc);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
final long actualIndexed = indexed.incrementAndGet();
if (actualIndexed % 1_000_000 == 0) {
System.out.println("Indexed: " + actualIndexed);
try {
DirectoryReader.open(w).close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
});
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.DAYS);
w.commit();
System.out.println("Done indexing");
}
}
List<Long> latencies = new ArrayList<>();
try (IndexReader reader = DirectoryReader.open(dir)) {
Random r = ThreadLocalRandom.current();
for (int i = 0; i < 10_000; ++i) {
StoredFields storedFields = reader.storedFields();
long start = System.nanoTime();
int[] ids = new int[20];
for (int j = 0; j < ids.length; ++j) {
ids[j] = r.nextInt(reader.maxDoc());
}
for (int doc : ids) {
storedFields.prefetch(doc);
}
for (int doc : ids) {
DUMMY += storedFields.document(doc).getBinaryValue("content").hashCode();
}
long end = System.nanoTime();
latencies.add((end - start) / 1000);
}
}
latencies.sort(null);
System.out.println("P50: " + latencies.get(latencies.size() / 2));
System.out.println("P90: " + latencies.get(latencies.size() * 9 / 10));
System.out.println("P99: " + latencies.get(latencies.size() * 99 / 100));
}
} Before the change: After the change: |
rmuir
reviewed
May 27, 2024
...ava/org/apache/lucene/codecs/lucene90/compressing/Lucene90CompressingStoredFieldsReader.java
Show resolved
Hide resolved
gf2121
reviewed
May 29, 2024
...ava/org/apache/lucene/codecs/lucene90/compressing/Lucene90CompressingStoredFieldsReader.java
Show resolved
Hide resolved
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This adds
StoredFields#prefetch(int)
, which mostly delegates toIndexInput#prefetch
. Callers can take advantage of this API to parallelize I/O across multiple stored documents by first callingStoredFields#prefetch
on all doc IDs before callingStoredFields#document
on all doc IDs.I added a cache of recently prefetched blocks to the default codec, in order to avoid prefetching the same block multiple times in a short period of time. This felt sensible given that doc ID reordering via recursive graph bisection or index sorting are likely to result in search results being clustered.