Skip to content

Commit

Permalink
Merge branch 'main' into shard-level-restore
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale authored Jul 26, 2023
2 parents 5b3df83 + 4319f2b commit 9ad837f
Show file tree
Hide file tree
Showing 31 changed files with 109 additions and 80 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Improve summary error message for invalid setting updates ([#4792](https://github.com/opensearch-project/OpenSearch/pull/4792))
- Remote Segment Store Repository setting moved from `index.remote_store.repository` to `index.remote_store.segment.repository` and `cluster.remote_store.repository` to `cluster.remote_store.segment.repository` respectively for Index and Cluster level settings ([#8719](https://github.com/opensearch-project/OpenSearch/pull/8719))
- [Remote Store] Add support to restore only unassigned shards of an index ([#8792](https://github.com/opensearch-project/OpenSearch/pull/8792))
- Replace the deprecated IndexReader APIs with new storedFields() & termVectors() ([#7792](https://github.com/opensearch-project/OpenSearch/pull/7792))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void testPrimaryRelocation() throws Exception {
createIndex(1);
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(100, 1000);
final int initialDocCount = scaledRandomIntBetween(10, 100);
final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values());
final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
for (int i = 0; i < initialDocCount; i++) {
Expand Down Expand Up @@ -137,7 +137,7 @@ public void testPrimaryRelocationWithSegRepFailure() throws Exception {
createIndex(1);
final String replica = internalCluster().startNode();
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(100, 1000);
final int initialDocCount = scaledRandomIntBetween(10, 100);
final WriteRequest.RefreshPolicy refreshPolicy = randomFrom(WriteRequest.RefreshPolicy.values());
final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
for (int i = 0; i < initialDocCount; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.PostingsEnum;
import org.apache.lucene.index.StoredFields;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TermVectors;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.search.BooleanClause;
Expand Down Expand Up @@ -808,8 +810,10 @@ public String describeParams() {
*/
private PriorityQueue<ScoreTerm> retrieveTerms(int docNum) throws IOException {
Map<String, Int> termFreqMap = new HashMap<>();
final TermVectors termVectors = ir.termVectors();
final StoredFields storedFields = ir.storedFields();
for (String fieldName : fieldNames) {
final Fields vectors = ir.getTermVectors(docNum);
final Fields vectors = termVectors.get(docNum);
final Terms vector;
if (vectors != null) {
vector = vectors.terms(fieldName);
Expand All @@ -819,7 +823,7 @@ private PriorityQueue<ScoreTerm> retrieveTerms(int docNum) throws IOException {

// field does not store term vector info
if (vector == null) {
Document d = ir.document(docNum);
Document d = storedFields.document(docNum);
IndexableField fields[] = d.getFields(fieldName);
for (IndexableField field : fields) {
final String stringValue = field.stringValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SerialMergeScheduler;
import org.apache.lucene.index.StoredFields;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
Expand Down Expand Up @@ -507,12 +508,11 @@ private static void consumeFromType(IndexSearcher indexSearcher, String type, Ch
final Bits liveDocs = leafReaderContext.reader().getLiveDocs();
final IntPredicate isLiveDoc = liveDocs == null ? i -> true : liveDocs::get;
final DocIdSetIterator docIdSetIterator = scorer.iterator();
final StoredFields storedFields = leafReaderContext.reader().storedFields();
while (docIdSetIterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
if (isLiveDoc.test(docIdSetIterator.docID())) {
logger.trace("processing doc {}", docIdSetIterator.docID());
bytesRefConsumer.accept(
leafReaderContext.reader().document(docIdSetIterator.docID()).getBinaryValue(DATA_FIELD_NAME)
);
bytesRefConsumer.accept(storedFields.document(docIdSetIterator.docID()).getBinaryValue(DATA_FIELD_NAME));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.lucene.index.ShuffleForcedMergePolicy;
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.index.StoredFields;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
Expand Down Expand Up @@ -2889,14 +2890,15 @@ private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryRead
final CombinedDocValues dv = new CombinedDocValues(leaf.reader());
final IdOnlyFieldVisitor idFieldVisitor = new IdOnlyFieldVisitor();
final DocIdSetIterator iterator = scorer.iterator();
final StoredFields storedFields = leaf.reader().storedFields();
int docId;
while ((docId = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
final long primaryTerm = dv.docPrimaryTerm(docId);
final long seqNo = dv.docSeqNo(docId);
localCheckpointTracker.markSeqNoAsProcessed(seqNo);
localCheckpointTracker.markSeqNoAsPersisted(seqNo);
idFieldVisitor.reset();
leaf.reader().document(docId, idFieldVisitor);
storedFields.document(docId, idFieldVisitor);
if (idFieldVisitor.getId() == null) {
assert dv.isTombstone(docId);
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ private Translog.Operation readDocAsOp(int docIndex) throws IOException {
? SourceFieldMapper.RECOVERY_SOURCE_NAME
: SourceFieldMapper.NAME;
final FieldsVisitor fields = new FieldsVisitor(true, sourceField);
leaf.reader().document(segmentDocID, fields);
leaf.reader().storedFields().document(segmentDocID, fields);

final Translog.Operation op;
final boolean isTombstone = parallelArray.isTombStone[docIndex];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ public NRTReplicationEngine(EngineConfig engineConfig) {
this.completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats"));
this.readerManager = readerManager;
this.readerManager.addListener(completionStatsCache);
for (ReferenceManager.RefreshListener listener : engineConfig.getExternalRefreshListener()) {
this.readerManager.addListener(listener);
}
// NRT Replicas do not have a concept of Internal vs External reader managers.
// We also do not want to wire up refresh listeners for waitFor & pending refresh location.
// which are the current external listeners set from IndexShard.
// Only wire up the internal listeners.
for (ReferenceManager.RefreshListener listener : engineConfig.getInternalRefreshListener()) {
this.readerManager.addListener(listener);
}
Expand Down Expand Up @@ -322,22 +323,12 @@ public List<Segment> segments(boolean verbose) {

@Override
public void refresh(String source) throws EngineException {
maybeRefresh(source);
// Refresh on this engine should only ever happen in the reader after new segments arrive.
}

@Override
public boolean maybeRefresh(String source) throws EngineException {
ensureOpen();
try {
return readerManager.maybeRefresh();
} catch (IOException e) {
try {
failEngine("refresh failed source[" + source + "]", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new RefreshFailedEngineException(shardId, e);
}
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void updateSegments(SegmentInfos infos) throws IOException {
// is always increased.
infos.updateGeneration(currentInfos);
currentInfos = infos;
maybeRefreshBlocking();
maybeRefresh();
}

public SegmentInfos getSegmentInfos() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,28 +221,33 @@ public int maxDoc() {

@Override
public void document(int docID, StoredFieldVisitor visitor) throws IOException {
if (docID != 0) {
throw new IllegalArgumentException("no such doc ID " + docID);
}
if (visitor.needsField(FAKE_SOURCE_FIELD) == StoredFieldVisitor.Status.YES) {
assert operation.source().toBytesRef().offset == 0;
assert operation.source().toBytesRef().length == operation.source().toBytesRef().bytes.length;
visitor.binaryField(FAKE_SOURCE_FIELD, operation.source().toBytesRef().bytes);
}
if (operation.routing() != null && visitor.needsField(FAKE_ROUTING_FIELD) == StoredFieldVisitor.Status.YES) {
visitor.stringField(FAKE_ROUTING_FIELD, operation.routing());
}
if (visitor.needsField(FAKE_ID_FIELD) == StoredFieldVisitor.Status.YES) {
BytesRef bytesRef = Uid.encodeId(operation.id());
final byte[] id = new byte[bytesRef.length];
System.arraycopy(bytesRef.bytes, bytesRef.offset, id, 0, bytesRef.length);
visitor.binaryField(FAKE_ID_FIELD, id);
}
storedFields().document(docID, visitor);
}

@Override
public StoredFields storedFields() throws IOException {
throw new UnsupportedOperationException();
return new StoredFields() {
@Override
public void document(int docID, StoredFieldVisitor visitor) throws IOException {
if (docID != 0) {
throw new IllegalArgumentException("no such doc ID " + docID);
}
if (visitor.needsField(FAKE_SOURCE_FIELD) == StoredFieldVisitor.Status.YES) {
assert operation.source().toBytesRef().offset == 0;
assert operation.source().toBytesRef().length == operation.source().toBytesRef().bytes.length;
visitor.binaryField(FAKE_SOURCE_FIELD, operation.source().toBytesRef().bytes);
}
if (operation.routing() != null && visitor.needsField(FAKE_ROUTING_FIELD) == StoredFieldVisitor.Status.YES) {
visitor.stringField(FAKE_ROUTING_FIELD, operation.routing());
}
if (visitor.needsField(FAKE_ID_FIELD) == StoredFieldVisitor.Status.YES) {
BytesRef bytesRef = Uid.encodeId(operation.id());
final byte[] id = new byte[bytesRef.length];
System.arraycopy(bytesRef.bytes, bytesRef.offset, id, 0, bytesRef.length);
visitor.binaryField(FAKE_ID_FIELD, id);
}
}
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ private GetResult innerGetLoadFromStoredFields(
);
if (fieldVisitor != null) {
try {
docIdAndVersion.reader.document(docIdAndVersion.docId, fieldVisitor);
docIdAndVersion.reader.storedFields().document(docIdAndVersion.docId, fieldVisitor);
} catch (IOException e) {
throw new OpenSearchException("Failed to get id [" + id + "]", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4423,7 +4423,8 @@ public void addRefreshListener(Translog.Location location, Consumer<Boolean> lis
readAllowed = isReadAllowed();
}
}
if (readAllowed) {
// NRT Replicas will not accept refresh listeners.
if (readAllowed && isSegmentReplicationAllowed() == false) {
refreshListeners.addOrNotify(location, listener);
} else {
// we're not yet ready fo ready for reads, just ignore refresh cycles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ public Status needsField(FieldInfo fieldInfo) throws IOException {
boolean matches(int doc) throws IOException {
routing = id = null;
leftToVisit = 2;
leafReader.document(doc, this);
leafReader.storedFields().document(doc, this);
assert id != null : "docID must not be null - we might have hit a nested document";
int targetShardId = OperationRouting.generateShardId(indexMetadata, id, routing);
return targetShardId != shardId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.MultiTerms;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TermVectors;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.memory.MemoryIndex;
import org.opensearch.OpenSearchException;
Expand Down Expand Up @@ -127,7 +128,8 @@ static TermVectorsResponse getTermVectors(IndexShard indexShard, TermVectorsRequ
/* or from an existing document */
else if (docIdAndVersion != null) {
// fields with stored term vectors
termVectorsByField = docIdAndVersion.reader.getTermVectors(docIdAndVersion.docId);
TermVectors termVectors = docIdAndVersion.reader.termVectors();
termVectorsByField = termVectors.get(docIdAndVersion.docId);
Set<String> selectedFields = request.selectedFields();
// generate tvs for fields where analyzer is overridden
if (selectedFields == null && request.perFieldAnalyzer() != null) {
Expand Down Expand Up @@ -322,7 +324,8 @@ private static Fields generateTermVectors(
}
}
/* and read vectors from it */
return index.createSearcher().getIndexReader().getTermVectors(0);
TermVectors termVectors = index.createSearcher().getIndexReader().termVectors();
return termVectors.get(0);
}

private static Fields generateTermVectorsFromDoc(IndexShard indexShard, TermVectorsRequest request) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public void execute(SearchContext context) {
SequentialStoredFieldsLeafReader lf = (SequentialStoredFieldsLeafReader) currentReaderContext.reader();
fieldReader = lf.getSequentialStoredFieldsReader()::document;
} else {
fieldReader = currentReaderContext.reader()::document;
fieldReader = currentReaderContext.reader().storedFields()::document;
}
for (FetchSubPhaseProcessor processor : processors) {
processor.setNextReader(currentReaderContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public static List<Object> loadFieldValues(
) throws IOException {
if (forceSource == false && fieldType.isStored()) {
CustomFieldsVisitor fieldVisitor = new CustomFieldsVisitor(singleton(fieldType.name()), false);
hitContext.reader().document(hitContext.docId(), fieldVisitor);
hitContext.reader().storedFields().document(hitContext.docId(), fieldVisitor);
List<Object> textsToHighlight = fieldVisitor.fields().get(fieldType.name());
return textsToHighlight != null ? textsToHighlight : Collections.emptyList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ private FieldLookup loadFieldData(String name) {
List<Object> values = new ArrayList<>(2);
SingleFieldsVisitor visitor = new SingleFieldsVisitor(data.fieldType(), values);
try {
reader.document(docId, visitor);
reader.storedFields().document(docId, visitor);
} catch (IOException e) {
throw new OpenSearchParseException("failed to load field [{}]", e, name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void setSegmentAndDocument(LeafReaderContext context, int docId) {
SequentialStoredFieldsLeafReader lf = (SequentialStoredFieldsLeafReader) context.reader();
fieldReader = lf.getSequentialStoredFieldsReader()::document;
} else {
fieldReader = context.reader()::document;
fieldReader = context.reader().storedFields()::document;
}
} catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.index.StoredFields;
import org.apache.lucene.tests.analysis.MockAnalyzer;
import org.apache.lucene.analysis.core.KeywordAnalyzer;
import org.apache.lucene.document.Document;
Expand Down Expand Up @@ -565,12 +566,13 @@ public void testWrapAllDocsLive() throws Exception {
}
try (DirectoryReader unwrapped = DirectoryReader.open(writer)) {
DirectoryReader reader = Lucene.wrapAllDocsLive(unwrapped);
StoredFields storedFields = reader.storedFields();
assertThat(reader.numDocs(), equalTo(liveDocs.size()));
IndexSearcher searcher = new IndexSearcher(reader);
Set<String> actualDocs = new HashSet<>();
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE);
for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
actualDocs.add(reader.document(scoreDoc.doc).get("id"));
actualDocs.add(storedFields.document(scoreDoc.doc).get("id"));
}
assertThat(actualDocs, equalTo(liveDocs));
}
Expand Down Expand Up @@ -609,13 +611,14 @@ public void testWrapLiveDocsNotExposeAbortedDocuments() throws Exception {
}
try (DirectoryReader unwrapped = DirectoryReader.open(writer)) {
DirectoryReader reader = Lucene.wrapAllDocsLive(unwrapped);
StoredFields storedFields = reader.storedFields();
assertThat(reader.maxDoc(), equalTo(numDocs + abortedDocs));
assertThat(reader.numDocs(), equalTo(liveDocs.size()));
IndexSearcher searcher = new IndexSearcher(reader);
List<String> actualDocs = new ArrayList<>();
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE);
for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
actualDocs.add(reader.document(scoreDoc.doc).get("id"));
actualDocs.add(storedFields.document(scoreDoc.doc).get("id"));
}
assertThat(actualDocs, equalTo(liveDocs));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.StoredFields;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.TermInSetQuery;
import org.apache.lucene.search.Query;
Expand Down Expand Up @@ -149,9 +150,10 @@ public void setUp() throws Exception {

// now go over each doc, build the relevant references and filter
reader = DirectoryReader.open(iw);
StoredFields storedFields = reader.storedFields();
List<BytesRef> filterTerms = new ArrayList<>();
for (int docId = 0; docId < reader.maxDoc(); docId++) {
Document doc = reader.document(docId);
Document doc = storedFields.document(docId);
addFreqs(doc, referenceAll);
if (!deletedIds.contains(doc.getField("id").stringValue())) {
addFreqs(doc, referenceNotDeleted);
Expand Down
Loading

0 comments on commit 9ad837f

Please sign in to comment.