From db72e96ada574197330bef0fb9aee0e0bbb6a28b Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Sat, 8 Jul 2023 00:35:26 -0700 Subject: [PATCH] Initial get support for segrep Signed-off-by: Marc Handalian --- .../index/engine/NRTReplicationEngine.java | 114 +++++++++++++++--- .../engine/NRTReplicationEngineTests.java | 109 +++++++++++++++++ 2 files changed, 209 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 50b5fbb8596a6..9c1b24c81efca 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -12,14 +12,20 @@ import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper; +import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.ReferenceManager; +import org.apache.lucene.util.BytesRef; import org.opensearch.common.concurrent.GatedCloseable; +import org.opensearch.common.lease.Releasable; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; +import org.opensearch.common.lucene.uid.Versions; +import org.opensearch.common.lucene.uid.VersionsAndSeqNoResolver; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.index.mapper.IdFieldMapper; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.seqno.SeqNoStats; import org.opensearch.index.seqno.SequenceNumbers; @@ -58,7 +64,7 @@ public class NRTReplicationEngine extends Engine { private final WriteOnlyTranslogManager translogManager; private volatile long lastReceivedGen = SequenceNumbers.NO_OPS_PERFORMED; - + private final LiveVersionMap versionMap = new LiveVersionMap(); private static final int SI_COUNTER_INCREMENT = 10; public NRTReplicationEngine(EngineConfig engineConfig) { @@ -168,12 +174,11 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep /** * Persist the latest live SegmentInfos. - * + *

* This method creates a commit point from the latest SegmentInfos. It is intended to be used when this shard is about to be promoted as the new primary. - * + *

* TODO: If this method is invoked while the engine is currently updating segments on its reader, wait for that update to complete so the updated segments are used. * - * * @throws IOException - When there is an IO error committing the SegmentInfos. */ private void commitSegmentInfos(SegmentInfos infos) throws IOException { @@ -219,6 +224,16 @@ public IndexResult index(Index index) throws IOException { indexResult.setTranslogLocation(location); indexResult.setTook(System.nanoTime() - index.startTime()); indexResult.freeze(); + try ( + Releasable ignored = versionMap.acquireLock(index.uid().bytes()); + ) { + final VersionValue versionFromMap = getVersionFromMap(index.uid().bytes()); + long version = versionFromMap != null ? versionFromMap.version : Versions.NOT_FOUND; + versionMap.enforceSafeAccess(); + versionMap.maybePutIndexUnderLock(index.uid().bytes(), + new IndexVersionValue(indexResult.getTranslogLocation(), version, index.seqNo(), index.primaryTerm()) + ); + } localCheckpointTracker.advanceMaxSeqNo(index.seqNo()); return indexResult; } @@ -226,11 +241,17 @@ public IndexResult index(Index index) throws IOException { @Override public DeleteResult delete(Delete delete) throws IOException { ensureOpen(); + versionMap.enforceSafeAccess(); DeleteResult deleteResult = new DeleteResult(delete.version(), delete.primaryTerm(), delete.seqNo(), true); final Translog.Location location = translogManager.add(new Translog.Delete(delete, deleteResult)); deleteResult.setTranslogLocation(location); deleteResult.setTook(System.nanoTime() - delete.startTime()); deleteResult.freeze(); + try ( + Releasable ignored = versionMap.acquireLock(delete.uid().bytes()); + ) { + versionMap.putDeleteUnderLock(delete.uid().bytes(), new DeleteVersionValue(delete.version(), delete.seqNo(), delete.primaryTerm(), engineConfig.getThreadPool().relativeTimeInMillis())); + } localCheckpointTracker.advanceMaxSeqNo(delete.seqNo()); return deleteResult; } @@ -247,9 +268,65 @@ public NoOpResult noOp(NoOp noOp) throws IOException { return noOpResult; } + private VersionValue getVersionFromMap(BytesRef id) { + if (versionMap.isUnsafe()) { + synchronized (versionMap) { + // we are switching from an unsafe map to a safe map. This might happen concurrently + // but we only need to do this once since the last operation per ID is to add to the version + // map so once we pass this point we can safely lookup from the version map. + if (versionMap.isUnsafe()) { +// refresh("unsafe_version_map", SearcherScope.INTERNAL, true); + } + versionMap.enforceSafeAccess(); + } + } + return versionMap.getUnderLock(id); + } + @Override public GetResult get(Get get, BiFunction searcherFactory) throws EngineException { - return getFromSearcher(get, searcherFactory, SearcherScope.EXTERNAL); + assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field(); + try (ReleasableLock ignored = readLock.acquire()) { + try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) { + final VersionValue versionFromMap = getVersionFromMap(get.uid().bytes()); + logger.info(versionMap.getAllCurrent()); + if (versionFromMap.isDelete()) { + return GetResult.NOT_EXISTS; + } + if (versionFromMap.getLocation() != null) { + try { + final Translog.Operation operation = translogManager.readOperation(versionFromMap.getLocation()); + if (operation != null) { + final Translog.Index index = (Translog.Index) operation; + TranslogLeafReader reader = new TranslogLeafReader(index); + return new GetResult( + new Engine.Searcher( + "realtime_get", + reader, + IndexSearcher.getDefaultSimilarity(), + null, + IndexSearcher.getDefaultQueryCachingPolicy(), + reader + ), + new VersionsAndSeqNoResolver.DocIdAndVersion( + 0, + index.version(), + index.seqNo(), + index.primaryTerm(), + reader, + 0 + ), + true + ); + } + } catch (IOException e) { + logger.error("WTF", e); + } + } + // if doc is not in translog search the index + return getFromSearcher(get, searcherFactory, SearcherScope.EXTERNAL); + } + } } @Override @@ -351,7 +428,8 @@ public boolean maybeRefresh(String source) throws EngineException { } @Override - public void writeIndexingBuffer() throws EngineException {} + public void writeIndexingBuffer() throws EngineException { + } @Override public boolean shouldPeriodicallyFlush() { @@ -359,7 +437,8 @@ public boolean shouldPeriodicallyFlush() { } @Override - public void flush(boolean force, boolean waitIfOngoing) throws EngineException {} + public void flush(boolean force, boolean waitIfOngoing) throws EngineException { + } @Override public void forceMerge( @@ -369,13 +448,15 @@ public void forceMerge( boolean upgrade, boolean upgradeOnlyAncientSegments, String forceMergeUUID - ) throws EngineException, IOException {} + ) throws EngineException, IOException { + } @Override public GatedCloseable acquireLastIndexCommit(boolean flushFirst) throws EngineException { try { final IndexCommit indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, store.directory()); - return new GatedCloseable<>(indexCommit, () -> {}); + return new GatedCloseable<>(indexCommit, () -> { + }); } catch (IOException e) { throw new EngineException(shardId, "Unable to build latest IndexCommit", e); } @@ -417,10 +498,12 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) { } @Override - public void activateThrottling() {} + public void activateThrottling() { + } @Override - public void deactivateThrottling() {} + public void deactivateThrottling() { + } @Override public int fillSeqNoGaps(long primaryTerm) throws IOException { @@ -428,10 +511,12 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException { } @Override - public void maybePruneDeletes() {} + public void maybePruneDeletes() { + } @Override - public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) {} + public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) { + } @Override public long getMaxSeqNoOfUpdatesOrDeletes() { @@ -439,7 +524,8 @@ public long getMaxSeqNoOfUpdatesOrDeletes() { } @Override - public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) {} + public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) { + } @Override public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) { diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java index 3e1112ae3069b..7f984b377b698 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java @@ -8,16 +8,25 @@ package org.opensearch.index.engine; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.TextField; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.Term; import org.apache.lucene.search.ReferenceManager; +import org.apache.lucene.search.TermQuery; +import org.hamcrest.MatcherAssert; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Settings; import org.opensearch.index.IndexSettings; +import org.opensearch.index.mapper.ParseContext; +import org.opensearch.index.mapper.ParsedDocument; +import org.opensearch.index.mapper.SourceFieldMapper; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.store.Store; @@ -32,9 +41,12 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; import static org.opensearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY; import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO; @@ -69,6 +81,92 @@ public void testCreateEngine() throws IOException { } } + public void testGet() throws IOException { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try ( + final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) + ) { + engine.refresh("warm_up"); + Engine.Searcher searchResult = engine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0)); + searchResult.close(); + + nrtEngine.refresh("warm_up"); + searchResult = nrtEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0)); + searchResult.close(); + + // create a document + ParseContext.Document document = testDocumentWithTextField(); + document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE)); + ParsedDocument doc = testParsedDocument("1", null, document, B_1, null); + engine.index(indexForDoc(doc)); + nrtEngine.index(indexForDoc(doc)); + + searchResult = engine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0)); + MatcherAssert.assertThat( + searchResult, + EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0) + ); + searchResult.close(); + + searchResult = nrtEngine.acquireSearcher("test"); + MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0)); + MatcherAssert.assertThat( + searchResult, + EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0) + ); + searchResult.close(); + + final BiFunction searcherFactory = engine::acquireSearcher; + final BiFunction nrtSearcherFactory = nrtEngine::acquireSearcher; + + + // but, not there non realtime + try (Engine.GetResult getResult = engine.get(newGet(false, doc), searcherFactory)) { + assertThat(getResult.exists(), equalTo(false)); + } + + // but, we can still get it (in realtime) + try (Engine.GetResult getResult = engine.get(newGet(true, doc), searcherFactory)) { + assertThat(getResult.exists(), equalTo(true)); + assertThat(getResult.docIdAndVersion(), notNullValue()); + logger.info(getResult.docIdAndVersion().docId); + } + + // but, we can still get it (in realtime) + try (Engine.GetResult getResult = nrtEngine.get(newGet(true, doc), searcherFactory)) { + assertThat(getResult.exists(), equalTo(true)); + assertThat(getResult.docIdAndVersion(), notNullValue()); + logger.info(getResult.docIdAndVersion().docBase); + } + + // now do an update + document = testDocument(); + document.add(new TextField("value", "test1", Field.Store.YES)); + document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_2), SourceFieldMapper.Defaults.FIELD_TYPE)); + doc = testParsedDocument("1", null, document, B_2, null); + engine.index(indexForDoc(doc)); + nrtEngine.index(indexForDoc(doc)); + + // but, we can still get it (in realtime) + try (Engine.GetResult getResult = engine.get(newGet(true, doc), searcherFactory)) { + assertThat(getResult.exists(), equalTo(true)); + assertThat(getResult.docIdAndVersion(), notNullValue()); + } + // now delete + nrtEngine.delete(new Engine.Delete("1", newUid(doc), primaryTerm.get())); + + // get should not see it (in realtime) + try (Engine.GetResult getResult = nrtEngine.get(newGet(true, doc), searcherFactory)) { + assertThat(getResult.exists(), equalTo(false)); + } + + } + } + public void testEngineWritesOpsToTranslog() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); @@ -93,6 +191,13 @@ public void testEngineWritesOpsToTranslog() throws Exception { ); assertEquals(nrtEngine.getLastSyncedGlobalCheckpoint(), engine.getLastSyncedGlobalCheckpoint()); + ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null); + engine.index(indexForDoc(doc)); + + final BiFunction searcherFactory = engine::acquireSearcher; + final Engine.GetResult getResult = engine.get(newGet(true, doc), searcherFactory); + logger.info(getResult); + // we don't index into nrtEngine, so get the doc ids from the regular engine. final List docs = getDocIds(engine, true); @@ -110,6 +215,10 @@ public void testEngineWritesOpsToTranslog() throws Exception { } } + public void testGetRequests() { + + } + public void testUpdateSegments_replicaReceivesSISWithHigherGen() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);