Skip to content

Commit

Permalink
Initial get support for segrep
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Jul 8, 2023
1 parent fc6a2ae commit db72e96
Show file tree
Hide file tree
Showing 2 changed files with 209 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,20 @@
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.util.BytesRef;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.lucene.uid.Versions;
import org.opensearch.common.lucene.uid.VersionsAndSeqNoResolver;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.index.mapper.IdFieldMapper;
import org.opensearch.index.seqno.LocalCheckpointTracker;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.seqno.SequenceNumbers;
Expand Down Expand Up @@ -58,7 +64,7 @@ public class NRTReplicationEngine extends Engine {
private final WriteOnlyTranslogManager translogManager;

private volatile long lastReceivedGen = SequenceNumbers.NO_OPS_PERFORMED;

private final LiveVersionMap versionMap = new LiveVersionMap();
private static final int SI_COUNTER_INCREMENT = 10;

public NRTReplicationEngine(EngineConfig engineConfig) {
Expand Down Expand Up @@ -168,12 +174,11 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep

/**
* Persist the latest live SegmentInfos.
*
* <p>
* This method creates a commit point from the latest SegmentInfos. It is intended to be used when this shard is about to be promoted as the new primary.
*
* <p>
* TODO: If this method is invoked while the engine is currently updating segments on its reader, wait for that update to complete so the updated segments are used.
*
*
* @throws IOException - When there is an IO error committing the SegmentInfos.
*/
private void commitSegmentInfos(SegmentInfos infos) throws IOException {
Expand Down Expand Up @@ -219,18 +224,34 @@ public IndexResult index(Index index) throws IOException {
indexResult.setTranslogLocation(location);
indexResult.setTook(System.nanoTime() - index.startTime());
indexResult.freeze();
try (
Releasable ignored = versionMap.acquireLock(index.uid().bytes());
) {
final VersionValue versionFromMap = getVersionFromMap(index.uid().bytes());
long version = versionFromMap != null ? versionFromMap.version : Versions.NOT_FOUND;
versionMap.enforceSafeAccess();
versionMap.maybePutIndexUnderLock(index.uid().bytes(),
new IndexVersionValue(indexResult.getTranslogLocation(), version, index.seqNo(), index.primaryTerm())
);
}
localCheckpointTracker.advanceMaxSeqNo(index.seqNo());
return indexResult;
}

@Override
public DeleteResult delete(Delete delete) throws IOException {
ensureOpen();
versionMap.enforceSafeAccess();
DeleteResult deleteResult = new DeleteResult(delete.version(), delete.primaryTerm(), delete.seqNo(), true);
final Translog.Location location = translogManager.add(new Translog.Delete(delete, deleteResult));
deleteResult.setTranslogLocation(location);
deleteResult.setTook(System.nanoTime() - delete.startTime());
deleteResult.freeze();
try (
Releasable ignored = versionMap.acquireLock(delete.uid().bytes());
) {
versionMap.putDeleteUnderLock(delete.uid().bytes(), new DeleteVersionValue(delete.version(), delete.seqNo(), delete.primaryTerm(), engineConfig.getThreadPool().relativeTimeInMillis()));
}
localCheckpointTracker.advanceMaxSeqNo(delete.seqNo());
return deleteResult;
}
Expand All @@ -247,9 +268,65 @@ public NoOpResult noOp(NoOp noOp) throws IOException {
return noOpResult;
}

private VersionValue getVersionFromMap(BytesRef id) {
if (versionMap.isUnsafe()) {
synchronized (versionMap) {
// we are switching from an unsafe map to a safe map. This might happen concurrently
// but we only need to do this once since the last operation per ID is to add to the version
// map so once we pass this point we can safely lookup from the version map.
if (versionMap.isUnsafe()) {
// refresh("unsafe_version_map", SearcherScope.INTERNAL, true);
}
versionMap.enforceSafeAccess();
}
}
return versionMap.getUnderLock(id);
}

@Override
public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException {
return getFromSearcher(get, searcherFactory, SearcherScope.EXTERNAL);
assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field();
try (ReleasableLock ignored = readLock.acquire()) {
try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {
final VersionValue versionFromMap = getVersionFromMap(get.uid().bytes());
logger.info(versionMap.getAllCurrent());
if (versionFromMap.isDelete()) {
return GetResult.NOT_EXISTS;
}
if (versionFromMap.getLocation() != null) {
try {
final Translog.Operation operation = translogManager.readOperation(versionFromMap.getLocation());
if (operation != null) {
final Translog.Index index = (Translog.Index) operation;
TranslogLeafReader reader = new TranslogLeafReader(index);
return new GetResult(
new Engine.Searcher(
"realtime_get",
reader,
IndexSearcher.getDefaultSimilarity(),
null,
IndexSearcher.getDefaultQueryCachingPolicy(),
reader
),
new VersionsAndSeqNoResolver.DocIdAndVersion(
0,
index.version(),
index.seqNo(),
index.primaryTerm(),
reader,
0
),
true
);
}
} catch (IOException e) {
logger.error("WTF", e);
}
}
// if doc is not in translog search the index
return getFromSearcher(get, searcherFactory, SearcherScope.EXTERNAL);
}
}
}

@Override
Expand Down Expand Up @@ -351,15 +428,17 @@ public boolean maybeRefresh(String source) throws EngineException {
}

@Override
public void writeIndexingBuffer() throws EngineException {}
public void writeIndexingBuffer() throws EngineException {
}

@Override
public boolean shouldPeriodicallyFlush() {
return false;
}

@Override
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {}
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
}

@Override
public void forceMerge(
Expand All @@ -369,13 +448,15 @@ public void forceMerge(
boolean upgrade,
boolean upgradeOnlyAncientSegments,
String forceMergeUUID
) throws EngineException, IOException {}
) throws EngineException, IOException {
}

@Override
public GatedCloseable<IndexCommit> acquireLastIndexCommit(boolean flushFirst) throws EngineException {
try {
final IndexCommit indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, store.directory());
return new GatedCloseable<>(indexCommit, () -> {});
return new GatedCloseable<>(indexCommit, () -> {
});
} catch (IOException e) {
throw new EngineException(shardId, "Unable to build latest IndexCommit", e);
}
Expand Down Expand Up @@ -417,29 +498,34 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
}

@Override
public void activateThrottling() {}
public void activateThrottling() {
}

@Override
public void deactivateThrottling() {}
public void deactivateThrottling() {
}

@Override
public int fillSeqNoGaps(long primaryTerm) throws IOException {
return 0;
}

@Override
public void maybePruneDeletes() {}
public void maybePruneDeletes() {
}

@Override
public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) {}
public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) {
}

@Override
public long getMaxSeqNoOfUpdatesOrDeletes() {
return localCheckpointTracker.getMaxSeqNo();
}

@Override
public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) {}
public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) {
}

@Override
public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,25 @@

package org.opensearch.index.engine;

import org.apache.lucene.document.Field;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.TermQuery;
import org.hamcrest.MatcherAssert;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.mapper.ParseContext;
import org.opensearch.index.mapper.ParsedDocument;
import org.opensearch.index.mapper.SourceFieldMapper;
import org.opensearch.index.seqno.LocalCheckpointTracker;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.store.Store;
Expand All @@ -32,9 +41,12 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.opensearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;
import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO;
Expand Down Expand Up @@ -69,6 +81,92 @@ public void testCreateEngine() throws IOException {
}
}

public void testGet() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
) {
engine.refresh("warm_up");
Engine.Searcher searchResult = engine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
searchResult.close();

nrtEngine.refresh("warm_up");
searchResult = nrtEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
searchResult.close();

// create a document
ParseContext.Document document = testDocumentWithTextField();
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));
ParsedDocument doc = testParsedDocument("1", null, document, B_1, null);
engine.index(indexForDoc(doc));
nrtEngine.index(indexForDoc(doc));

searchResult = engine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
MatcherAssert.assertThat(
searchResult,
EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0)
);
searchResult.close();

searchResult = nrtEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
MatcherAssert.assertThat(
searchResult,
EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0)
);
searchResult.close();

final BiFunction<String, Engine.SearcherScope, Engine.Searcher> searcherFactory = engine::acquireSearcher;
final BiFunction<String, Engine.SearcherScope, Engine.Searcher> nrtSearcherFactory = nrtEngine::acquireSearcher;


// but, not there non realtime
try (Engine.GetResult getResult = engine.get(newGet(false, doc), searcherFactory)) {
assertThat(getResult.exists(), equalTo(false));
}

// but, we can still get it (in realtime)
try (Engine.GetResult getResult = engine.get(newGet(true, doc), searcherFactory)) {
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
logger.info(getResult.docIdAndVersion().docId);
}

// but, we can still get it (in realtime)
try (Engine.GetResult getResult = nrtEngine.get(newGet(true, doc), searcherFactory)) {
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
logger.info(getResult.docIdAndVersion().docBase);
}

// now do an update
document = testDocument();
document.add(new TextField("value", "test1", Field.Store.YES));
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_2), SourceFieldMapper.Defaults.FIELD_TYPE));
doc = testParsedDocument("1", null, document, B_2, null);
engine.index(indexForDoc(doc));
nrtEngine.index(indexForDoc(doc));

// but, we can still get it (in realtime)
try (Engine.GetResult getResult = engine.get(newGet(true, doc), searcherFactory)) {
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
}
// now delete
nrtEngine.delete(new Engine.Delete("1", newUid(doc), primaryTerm.get()));

// get should not see it (in realtime)
try (Engine.GetResult getResult = nrtEngine.get(newGet(true, doc), searcherFactory)) {
assertThat(getResult.exists(), equalTo(false));
}

}
}

public void testEngineWritesOpsToTranslog() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

Expand All @@ -93,6 +191,13 @@ public void testEngineWritesOpsToTranslog() throws Exception {
);
assertEquals(nrtEngine.getLastSyncedGlobalCheckpoint(), engine.getLastSyncedGlobalCheckpoint());

ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null);
engine.index(indexForDoc(doc));

final BiFunction<String, Engine.SearcherScope, Engine.Searcher> searcherFactory = engine::acquireSearcher;
final Engine.GetResult getResult = engine.get(newGet(true, doc), searcherFactory);
logger.info(getResult);

// we don't index into nrtEngine, so get the doc ids from the regular engine.
final List<DocIdSeqNoAndSource> docs = getDocIds(engine, true);

Expand All @@ -110,6 +215,10 @@ public void testEngineWritesOpsToTranslog() throws Exception {
}
}

public void testGetRequests() {

}

public void testUpdateSegments_replicaReceivesSISWithHigherGen() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

Expand Down

0 comments on commit db72e96

Please sign in to comment.