From e77b6164293a182bda8e73e8e94dd05373dfd1a6 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Sun, 8 May 2022 19:22:07 -0700 Subject: [PATCH] Add a separate Engine implementation for replicas with segment replication enabled. This change adds a new engine intended to be used on replicas with segment replication enabled. This engine does not wire up an IndexWriter, but still writes all operations to a translog. The engine uses a new ReaderManager that refreshes from an externally provided SegmentInfos. Signed-off-by: Marc Handalian --- .../org/opensearch/index/engine/Engine.java | 6 + .../opensearch/index/engine/EngineConfig.java | 21 +- .../index/engine/EngineConfigFactory.java | 6 +- .../index/engine/InternalEngine.java | 18 + .../index/engine/NRTReplicationEngine.java | 331 ++++++++++++++++++ .../engine/NRTReplicationEngineFactory.java | 19 + .../engine/NRTReplicationReaderManager.java | 90 +++++ .../index/engine/ReadOnlyEngine.java | 5 + .../opensearch/index/shard/IndexShard.java | 3 +- .../opensearch/indices/IndicesService.java | 4 + .../engine/EngineConfigFactoryTests.java | 6 +- .../index/engine/InternalEngineTests.java | 9 +- .../engine/NRTReplicationEngineTest.java | 132 +++++++ .../index/shard/IndexShardTests.java | 35 +- .../index/shard/RefreshListenersTests.java | 3 +- .../IndexingMemoryControllerTests.java | 3 +- .../index/engine/EngineTestCase.java | 37 +- 17 files changed, 696 insertions(+), 32 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java create mode 100644 server/src/main/java/org/opensearch/index/engine/NRTReplicationEngineFactory.java create mode 100644 server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java create mode 100644 server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTest.java diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 6dd067a1158f5..7d59c70429046 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -169,6 +169,12 @@ public final EngineConfig config() { protected abstract SegmentInfos getLastCommittedSegmentInfos(); + /** + * Return the latest active SegmentInfos from the engine. + * @return {@link SegmentInfos} + */ + protected abstract SegmentInfos getLatestSegmentInfos(); + public MergeStats getMergeStats() { return new MergeStats(); } diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java index 52dff2826657b..3f6d4568ca592 100644 --- a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java @@ -97,6 +97,7 @@ public final class EngineConfig { private final CircuitBreakerService circuitBreakerService; private final LongSupplier globalCheckpointSupplier; private final Supplier retentionLeasesSupplier; + private boolean isReadOnlyReplica; /** * A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been @@ -171,7 +172,8 @@ public EngineConfig( LongSupplier globalCheckpointSupplier, Supplier retentionLeasesSupplier, LongSupplier primaryTermSupplier, - TombstoneDocSupplier tombstoneDocSupplier + TombstoneDocSupplier tombstoneDocSupplier, + boolean isReadOnlyReplica ) { this( shardId, @@ -196,7 +198,8 @@ public EngineConfig( globalCheckpointSupplier, retentionLeasesSupplier, primaryTermSupplier, - tombstoneDocSupplier + tombstoneDocSupplier, + isReadOnlyReplica ); } @@ -226,7 +229,8 @@ public EngineConfig( LongSupplier globalCheckpointSupplier, Supplier retentionLeasesSupplier, LongSupplier primaryTermSupplier, - TombstoneDocSupplier tombstoneDocSupplier + TombstoneDocSupplier tombstoneDocSupplier, + boolean isReadOnlyReplica ) { this.shardId = shardId; this.indexSettings = indexSettings; @@ -266,6 +270,7 @@ public EngineConfig( this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier); this.primaryTermSupplier = primaryTermSupplier; this.tombstoneDocSupplier = tombstoneDocSupplier; + this.isReadOnlyReplica = isReadOnlyReplica; } /** @@ -460,6 +465,16 @@ public LongSupplier getPrimaryTermSupplier() { return primaryTermSupplier; } + /** + * Returns if this replica should be wired as a read only. + * This is used for Segment Replication where the engine implementation used is dependent on + * if the shard is a primary/replica. + * @return true if this engine should be wired as read only. + */ + public boolean isReadOnlyReplica() { + return isReadOnlyReplica; + } + /** * A supplier supplies tombstone documents which will be used in soft-update methods. * The returned document consists only _uid, _seqno, _term and _version fields; other metadata fields are excluded. diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java b/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java index afab57905a9a7..c8aec3570f8b5 100644 --- a/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java +++ b/server/src/main/java/org/opensearch/index/engine/EngineConfigFactory.java @@ -146,7 +146,8 @@ public EngineConfig newEngineConfig( LongSupplier globalCheckpointSupplier, Supplier retentionLeasesSupplier, LongSupplier primaryTermSupplier, - EngineConfig.TombstoneDocSupplier tombstoneDocSupplier + EngineConfig.TombstoneDocSupplier tombstoneDocSupplier, + boolean isReadOnlyReplica ) { CodecService codecServiceToUse = codecService; if (codecService == null && this.codecServiceFactory != null) { @@ -176,7 +177,8 @@ public EngineConfig newEngineConfig( globalCheckpointSupplier, retentionLeasesSupplier, primaryTermSupplier, - tombstoneDocSupplier + tombstoneDocSupplier, + isReadOnlyReplica ); } diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index a111e27f0975f..400d76c3955bb 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -49,6 +49,7 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.ShuffleForcedMergePolicy; import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; +import org.apache.lucene.index.StandardDirectoryReader; import org.apache.lucene.index.Term; import org.apache.lucene.sandbox.index.MergeOnFlushMergePolicy; import org.apache.lucene.search.BooleanClause; @@ -2095,6 +2096,23 @@ protected SegmentInfos getLastCommittedSegmentInfos() { return lastCommittedSegmentInfos; } + @Override + public SegmentInfos getLatestSegmentInfos() { + OpenSearchDirectoryReader reader = null; + try { + reader = externalReaderManager.internalReaderManager.acquire(); + return ((StandardDirectoryReader) reader.getDelegate()).getSegmentInfos(); + } catch (IOException e) { + throw new EngineException(shardId, e.getMessage(), e); + } finally { + try { + externalReaderManager.internalReaderManager.release(reader); + } catch (IOException e) { + throw new EngineException(shardId, e.getMessage(), e); + } + } + } + @Override protected final void writerSegmentStats(SegmentsStats stats) { stats.addVersionMapMemoryInBytes(versionMap.ramBytesUsed()); diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java new file mode 100644 index 0000000000000..f2ea501c634b9 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -0,0 +1,331 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper; +import org.apache.lucene.search.ReferenceManager; +import org.opensearch.common.concurrent.GatedCloseable; +import org.opensearch.common.lucene.Lucene; +import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; +import org.opensearch.core.internal.io.IOUtils; +import org.opensearch.index.seqno.LocalCheckpointTracker; +import org.opensearch.index.seqno.SeqNoStats; +import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.translog.Translog; +import org.opensearch.search.suggest.completion.CompletionStats; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.function.BiFunction; + +/** + * This is an {@link Engine} implementation intended for replica shards when Segment Replication + * is enabled. This Engine does not create an IndexWriter, rather it refreshes a {@link NRTReplicationReaderManager} + * with new Segments when received from an external source. + * + * @opensearch.internal + */ +public class NRTReplicationEngine extends TranslogAwareEngine { + + private volatile SegmentInfos lastCommittedSegmentInfos; + private final NRTReplicationReaderManager readerManager; + private final CompletionStatsCache completionStatsCache; + private final LocalCheckpointTracker localCheckpointTracker; + + public NRTReplicationEngine(EngineConfig engineConfig) { + super(engineConfig); + store.incRef(); + try { + lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); + readerManager = new NRTReplicationReaderManager(OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId)); + final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(this.lastCommittedSegmentInfos.getUserData().entrySet()); + this.localCheckpointTracker = new LocalCheckpointTracker(commitInfo.maxSeqNo, commitInfo.localCheckpoint); + this.completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats")); + this.readerManager.addListener(completionStatsCache); + } catch (IOException e) { + IOUtils.closeWhileHandlingException(store::decRef, translog); + throw new EngineCreationFailureException(shardId, "failed to create engine", e); + } + } + + public synchronized void updateSegments(final SegmentInfos infos, long seqNo) + throws IOException { + try { + store.incRef(); + // Update the current infos reference on the Engine's reader. + readerManager.updateSegments(infos); + + // only update the persistedSeqNo and "lastCommitted" infos reference if the incoming segments have a higher + // generation. We can still refresh with incoming SegmentInfos that are not part of a commit point. + if (infos.getGeneration() > lastCommittedSegmentInfos.getGeneration()) { + this.lastCommittedSegmentInfos = infos; + localCheckpointTracker.fastForwardPersistedSeqNo(seqNo); + } + readerManager.maybeRefresh(); + } finally { + store.decRef(); + } + } + + @Override + public String getHistoryUUID() { + return loadHistoryUUID(lastCommittedSegmentInfos.userData); + } + + @Override + public long getWritingBytes() { + return 0; + } + + @Override + public CompletionStats completionStats(String... fieldNamePatterns) { + return completionStatsCache.get(fieldNamePatterns); + } + + @Override + public long getIndexThrottleTimeInMillis() { + return 0; + } + + @Override + public boolean isThrottled() { + return false; + } + + @Override + public IndexResult index(Index index) throws IOException { + IndexResult indexResult = new IndexResult( + index.version(), + index.primaryTerm(), + index.seqNo(), + false + ); + addIndexOperationToTranslog(index, indexResult); + indexResult.setTook(System.nanoTime() - index.startTime()); + indexResult.freeze(); + localCheckpointTracker.markSeqNoAsProcessed(indexResult.getSeqNo()); + return indexResult; + } + + @Override + public DeleteResult delete(Delete delete) throws IOException { + DeleteResult deleteResult = new DeleteResult( + delete.version(), + delete.primaryTerm(), + delete.seqNo(), + true + ); + addDeleteOperationToTranslog(delete, deleteResult); + deleteResult.setTook(System.nanoTime() - delete.startTime()); + deleteResult.freeze(); + localCheckpointTracker.markSeqNoAsProcessed(deleteResult.getSeqNo()); + return deleteResult; + } + + @Override + public NoOpResult noOp(NoOp noOp) throws IOException { + NoOpResult noOpResult = new NoOpResult(noOp.primaryTerm(), noOp.seqNo()); + final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason())); + noOpResult.setTranslogLocation(location); + noOpResult.setTook(System.nanoTime() - noOp.startTime()); + noOpResult.freeze(); + localCheckpointTracker.markSeqNoAsProcessed(noOpResult.getSeqNo()); + return noOpResult; + } + + @Override + public GetResult get(Get get, BiFunction searcherFactory) throws EngineException { + return getFromSearcher(get, searcherFactory, SearcherScope.EXTERNAL); + } + + @Override + protected ReferenceManager getReferenceManager(SearcherScope scope) { + return readerManager; + } + + @Override + public Closeable acquireHistoryRetentionLock() { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange, boolean accurateCount) throws IOException { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNumber) throws IOException { + return 0; + } + + + @Override + public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) { + return false; + } + + @Override + public long getMinRetainedSeqNo() { + return localCheckpointTracker.getProcessedCheckpoint(); + } + + @Override + public long getPersistedLocalCheckpoint() { + return localCheckpointTracker.getPersistedCheckpoint(); + } + + public long getProcessedLocalCheckpoint() { + return localCheckpointTracker.getProcessedCheckpoint(); + } + + @Override + public SeqNoStats getSeqNoStats(long globalCheckpoint) { + return localCheckpointTracker.getStats(globalCheckpoint); + } + + @Override + protected LocalCheckpointTracker getLocalCheckpointTracker() { + return localCheckpointTracker; + } + + @Override + public long getIndexBufferRAMBytesUsed() { + return 0; + } + + @Override + public List segments(boolean verbose) { + return Arrays.asList(getSegmentInfo(lastCommittedSegmentInfos, verbose)); + } + + @Override + public void refresh(String source) throws EngineException {} + + @Override + public boolean maybeRefresh(String source) throws EngineException { + return false; + } + + @Override + public void writeIndexingBuffer() throws EngineException {} + + @Override + public boolean shouldPeriodicallyFlush() { + return false; + } + + @Override + public void flush(boolean force, boolean waitIfOngoing) throws EngineException {} + + + @Override + public void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade, boolean upgradeOnlyAncientSegments, String forceMergeUUID) throws EngineException, IOException {} + + @Override + public GatedCloseable acquireLastIndexCommit(boolean flushFirst) throws EngineException { + store.incRef(); + try { + final IndexCommit indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, store.directory()); + return new GatedCloseable<>(indexCommit, store::decRef); + } catch (IOException e) { + throw new EngineException(shardId, "Unable to build latest IndexCommit", e); + } + } + + @Override + public GatedCloseable acquireSafeIndexCommit() throws EngineException { + return acquireLastIndexCommit(false); + } + + @Override + public SafeCommitInfo getSafeCommitInfo() { + return new SafeCommitInfo(localCheckpointTracker.getProcessedCheckpoint(), lastCommittedSegmentInfos.totalMaxDoc()); + } + + @Override + protected final void closeNoLock(String reason, CountDownLatch closedLatch) { + if (isClosed.compareAndSet(false, true)) { + assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() + : "Either the write lock must be held or the engine must be currently be failing itself"; + try { + IOUtils.close(readerManager, translog, store::decRef); + } catch (Exception e) { + logger.warn("failed to close engine", e); + } finally { + try { + logger.debug("engine closed [{}]", reason); + } finally { + closedLatch.countDown(); + } + } + } + } + + @Override + protected void deleteUnusedFiles() {} + + @Override + public void activateThrottling() {} + + @Override + public void deactivateThrottling() {} + + @Override + public int fillSeqNoGaps(long primaryTerm) throws IOException { + return 0; + } + + @Override + public Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException { + throw new UnsupportedOperationException("Read only replicas do not have an IndexWriter and cannot recover from a translog."); + } + + @Override + public void skipTranslogRecovery() { + // Do nothing. + } + + @Override + public void maybePruneDeletes() {} + + @Override + public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) {} + + @Override + public long getMaxSeqNoOfUpdatesOrDeletes() { + return localCheckpointTracker.getMaxSeqNo(); + } + + @Override + public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) {} + + @Override + protected SegmentInfos getLastCommittedSegmentInfos() { + return lastCommittedSegmentInfos; + } + + @Override + protected SegmentInfos getLatestSegmentInfos() { + return readerManager.getSegmentInfos(); + } + + private DirectoryReader getDirectoryReader() throws IOException { + // for segment replication: replicas should create the reader from store, we don't want an open IW on replicas. + return new SoftDeletesDirectoryReaderWrapper( + DirectoryReader.open(store.directory()), + Lucene.SOFT_DELETES_FIELD + ); + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngineFactory.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngineFactory.java new file mode 100644 index 0000000000000..ccbd04129b349 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngineFactory.java @@ -0,0 +1,19 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +public class NRTReplicationEngineFactory implements EngineFactory { + @Override + public Engine newReadWriteEngine(EngineConfig config) { + if (config.isReadOnlyReplica()) { + return new NRTReplicationEngine(config); + } + return new InternalEngine(config); + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java new file mode 100644 index 0000000000000..a97f3b191d53c --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java @@ -0,0 +1,90 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper; +import org.apache.lucene.index.StandardDirectoryReader; +import org.opensearch.common.lucene.Lucene; +import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; +import org.opensearch.index.shard.ShardId; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * This is an extension of {@link OpenSearchReaderManager} for use with {@link NRTReplicationEngine}. + * The manager holds a reference to the latest {@link SegmentInfos} object that is used to refresh a reader. + * + * @opensearch.internal + */ +public class NRTReplicationReaderManager extends OpenSearchReaderManager { + + protected static Logger logger = LogManager.getLogger(NRTReplicationReaderManager.class); + private volatile SegmentInfos currentInfos; + + /** + * Creates and returns a new SegmentReplicationReaderManager from the given + * already-opened {@link OpenSearchDirectoryReader}, stealing + * the incoming reference. + * + * @param reader the SegmentReplicationReaderManager to use for future reopens + */ + NRTReplicationReaderManager(OpenSearchDirectoryReader reader) { + super(reader); + currentInfos = unwrapStandardReader(reader).getSegmentInfos(); + } + + @Override + protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader referenceToRefresh) throws IOException { + Objects.requireNonNull(referenceToRefresh); + final List subs = new ArrayList<>(); + final StandardDirectoryReader standardDirectoryReader = unwrapStandardReader(referenceToRefresh); + for (LeafReaderContext ctx : standardDirectoryReader.leaves()) { + subs.add(ctx.reader()); + } + DirectoryReader innerReader = StandardDirectoryReader.open(referenceToRefresh.directory(), currentInfos, subs, null); + final DirectoryReader softDeletesDirectoryReaderWrapper = new SoftDeletesDirectoryReaderWrapper( + innerReader, + Lucene.SOFT_DELETES_FIELD + ); + logger.trace("updated to SegmentInfosVersion=" + currentInfos.getVersion() + " reader=" + innerReader); + return OpenSearchDirectoryReader.wrap(softDeletesDirectoryReaderWrapper, referenceToRefresh.shardId()); + } + + /** + * Update this reader's segments and refresh. + * + * @param infos {@link SegmentInfos} infos + * @throws IOException - When Refresh fails with an IOException. + */ + public synchronized void updateSegments(SegmentInfos infos) throws IOException { + currentInfos = infos; + maybeRefresh(); + } + + public SegmentInfos getSegmentInfos() { + return currentInfos; + } + + private StandardDirectoryReader unwrapStandardReader(OpenSearchDirectoryReader reader) { + final DirectoryReader delegate = reader.getDelegate(); + if (delegate instanceof SoftDeletesDirectoryReaderWrapper) { + return (StandardDirectoryReader) ((SoftDeletesDirectoryReaderWrapper) delegate).getDelegate(); + } + return (StandardDirectoryReader) delegate; + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index 2e3155a4d173e..23a86d8da5599 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -270,6 +270,11 @@ protected SegmentInfos getLastCommittedSegmentInfos() { return lastCommittedSegmentInfos; } + @Override + protected SegmentInfos getLatestSegmentInfos() { + return lastCommittedSegmentInfos; + } + @Override public String getHistoryUUID() { return lastCommittedSegmentInfos.userData.get(Engine.HISTORY_UUID_KEY); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index d8fa560a4a812..64355b2f2b064 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3122,7 +3122,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { globalCheckpointSupplier, replicationTracker::getRetentionLeases, () -> getOperationPrimaryTerm(), - tombstoneDocSupplier() + tombstoneDocSupplier(), + indexSettings.isSegRepEnabled() && shardRouting.primary() == false ); } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 87cd63119f1d4..48bf27126924a 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -109,6 +109,7 @@ import org.opensearch.index.engine.EngineConfigFactory; import org.opensearch.index.engine.EngineFactory; import org.opensearch.index.engine.InternalEngineFactory; +import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.engine.NoOpEngine; import org.opensearch.index.fielddata.IndexFieldDataCache; import org.opensearch.index.flush.FlushStats; @@ -762,6 +763,9 @@ private EngineFactory getEngineFactory(final IndexSettings idxSettings) { .filter(maybe -> Objects.requireNonNull(maybe).isPresent()) .collect(Collectors.toList()); if (engineFactories.isEmpty()) { + if (idxSettings.isSegRepEnabled()) { + return new NRTReplicationEngineFactory(); + } return new InternalEngineFactory(); } else if (engineFactories.size() == 1) { assert engineFactories.get(0).isPresent(); diff --git a/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java b/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java index 8030619500278..7ddd92ea7b36e 100644 --- a/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java +++ b/server/src/test/java/org/opensearch/index/engine/EngineConfigFactoryTests.java @@ -65,7 +65,8 @@ public void testCreateEngineConfigFromFactory() { null, () -> new RetentionLeases(0, 0, Collections.emptyList()), null, - null + null, + false ); assertNotNull(config.getCodec()); @@ -141,7 +142,8 @@ public void testCreateCodecServiceFromFactory() { null, () -> new RetentionLeases(0, 0, Collections.emptyList()), null, - null + null, + false ); assertNotNull(config.getCodec()); } diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index cbae55a047a1e..597bdeb7d69d1 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -3608,7 +3608,8 @@ public void testRecoverFromForeignTranslog() throws IOException { () -> UNASSIGNED_SEQ_NO, () -> RetentionLeases.EMPTY, primaryTerm::get, - tombstoneDocSupplier() + tombstoneDocSupplier(), + false ); expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig)); @@ -3652,7 +3653,8 @@ public CustomTranslogDeletionPolicy(IndexSettings indexSettings, Supplier operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean()); + for (Engine.Operation op : operations) { + applyOperation(engine, op); + applyOperation(nrtEngine, op); + } + + assertEquals(nrtEngine.getProcessedLocalCheckpoint(), engine.getProcessedLocalCheckpoint()); + + // we don't index into nrtEngine, so get the doc ids from the regular engine. + final List docs = getDocIds(engine, true); + + // recover a new engine from the nrtEngine's xlog. + nrtEngine.syncTranslog(); + try (InternalEngine engine = new InternalEngine(nrtEngine.config())) { + engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + assertEquals(getDocIds(engine, true), docs); + } + assertEngineCleanedUp(nrtEngine, nrtEngine.getTranslog()); + } + } + + public void testUpdateSegments() throws Exception { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + + try (final Store nrtEngineStore = createStore(); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore);) { + // add docs to the primary engine. + List operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean()).stream().filter(op -> op.operationType().equals(Engine.Operation.TYPE.INDEX)).collect(Collectors.toList()); + for (Engine.Operation op : operations) { + applyOperation(engine, op); + applyOperation(nrtEngine, op); + } + assertEquals(nrtEngine.getProcessedLocalCheckpoint(), engine.getProcessedLocalCheckpoint()); + + engine.refresh("test"); + + nrtEngine.updateSegments(engine.getLatestSegmentInfos(), engine.getProcessedLocalCheckpoint()); + assertMatchingSegmentsAndCheckpoints(nrtEngine); + + // Flush the primary and update the NRTEngine with the latest committed infos. + engine.flush(); + nrtEngine.updateSegments(engine.getLastCommittedSegmentInfos(), engine.getPersistedLocalCheckpoint()); + assertMatchingSegmentsAndCheckpoints(nrtEngine); + + // Ensure the same hit count between engines. + int expectedDocCount; + try (final Engine.Searcher test = engine.acquireSearcher("test")) { + expectedDocCount = test.count(Queries.newMatchAllQuery()); + assertSearcherHits(nrtEngine, expectedDocCount); + } + assertEngineCleanedUp(nrtEngine, nrtEngine.getTranslog()); + } + } + + private void assertMatchingSegmentsAndCheckpoints(NRTReplicationEngine nrtEngine) { + assertEquals(engine.getPersistedLocalCheckpoint(), nrtEngine.getPersistedLocalCheckpoint()); + assertEquals(engine.getProcessedLocalCheckpoint(), nrtEngine.getProcessedLocalCheckpoint()); + assertEquals(engine.getLocalCheckpointTracker().getMaxSeqNo(), nrtEngine.getLocalCheckpointTracker().getMaxSeqNo()); + assertEquals(engine.segments(true), nrtEngine.segments(true)); + } + + private void assertSearcherHits(Engine engine, int hits) { + try (final Engine.Searcher test = engine.acquireSearcher("test")) { + MatcherAssert.assertThat(test, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(hits)); + } + } + + private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, Store store) throws IOException { + Lucene.cleanLuceneIndex(store.directory()); + final Path translogDir = createTempDir(); + final EngineConfig replicaConfig = config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get); + if (Lucene.indexExists(store.directory()) == false) { + store.createEmpty(replicaConfig.getIndexSettings().getIndexVersionCreated().luceneVersion); + final String translogUuid = Translog.createEmptyTranslog( + replicaConfig.getTranslogConfig().getTranslogPath(), + SequenceNumbers.NO_OPS_PERFORMED, + shardId, + primaryTerm.get() + ); + store.associateIndexWithNewTranslog(translogUuid); + } + return new NRTReplicationEngine(replicaConfig); + } +} diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 04e89daf2e375..b06c8dfae5828 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -100,6 +100,8 @@ import org.opensearch.index.engine.EngineTestCase; import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.engine.InternalEngineFactory; +import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.engine.NRTReplicationEngine; import org.opensearch.index.engine.ReadOnlyEngine; import org.opensearch.index.fielddata.FieldDataStats; import org.opensearch.index.fielddata.IndexFieldData; @@ -134,6 +136,7 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.repositories.IndexId; import org.opensearch.snapshots.Snapshot; import org.opensearch.snapshots.SnapshotId; @@ -4037,7 +4040,7 @@ public void testCloseShardWhileResettingEngine() throws Exception { CountDownLatch closeDoneLatch = new CountDownLatch(1); IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> new InternalEngine(config) { @Override - public Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) + public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException { readyToCloseLatch.countDown(); try { @@ -4096,9 +4099,9 @@ public void testSnapshotWhileResettingEngine() throws Exception { CountDownLatch snapshotDoneLatch = new CountDownLatch(1); IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> new InternalEngine(config) { @Override - public Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) + public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException { - Engine engine = super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo); + InternalEngine engine = super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo); readyToSnapshotLatch.countDown(); try { snapshotDoneLatch.await(); @@ -4378,6 +4381,29 @@ protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(SeqNoStats seqNoStats) { closeShards(readonlyShard); } + public void testReadOnlyReplicaEngineConfig() throws IOException { + Settings primarySettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + final IndexShard primaryShard = newStartedShard(false, primarySettings, new NRTReplicationEngineFactory()); + assertFalse(primaryShard.getEngine().config().isReadOnlyReplica()); + assertEquals(primaryShard.getEngine().getClass(), InternalEngine.class); + + Settings replicaSettings = Settings.builder() + .put(primarySettings) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + final IndexShard replicaShard = newStartedShard(false, replicaSettings, new NRTReplicationEngineFactory()); + assertTrue(replicaShard.getEngine().config().isReadOnlyReplica()); + assertEquals(replicaShard.getEngine().getClass(), NRTReplicationEngine.class); + + + closeShards(primaryShard, replicaShard); + } + + public void testCloseShardWhileEngineIsWarming() throws Exception { CountDownLatch warmerStarted = new CountDownLatch(1); CountDownLatch warmerBlocking = new CountDownLatch(1); @@ -4413,7 +4439,8 @@ public void testCloseShardWhileEngineIsWarming() throws Exception { config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), config.getPrimaryTermSupplier(), - config.getTombstoneDocSupplier() + config.getTombstoneDocSupplier(), + false ); return new InternalEngine(configWithWarmer); }); diff --git a/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java index eea316d9a9370..13c5a3bdf0465 100644 --- a/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RefreshListenersTests.java @@ -170,7 +170,8 @@ public void onFailedEngine(String reason, @Nullable Exception e) { () -> SequenceNumbers.NO_OPS_PERFORMED, () -> RetentionLeases.EMPTY, () -> primaryTerm, - EngineTestCase.tombstoneDocSupplier() + EngineTestCase.tombstoneDocSupplier(), + false ); engine = new InternalEngine(config); engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE); diff --git a/server/src/test/java/org/opensearch/indices/IndexingMemoryControllerTests.java b/server/src/test/java/org/opensearch/indices/IndexingMemoryControllerTests.java index c68ad7eaba82e..b44707035a2f6 100644 --- a/server/src/test/java/org/opensearch/indices/IndexingMemoryControllerTests.java +++ b/server/src/test/java/org/opensearch/indices/IndexingMemoryControllerTests.java @@ -422,7 +422,8 @@ EngineConfig configWithRefreshListener(EngineConfig config, ReferenceManager.Ref config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), config.getPrimaryTermSupplier(), - config.getTombstoneDocSupplier() + config.getTombstoneDocSupplier(), + false ); } diff --git a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java index 2bce5a7c81794..10988ea1afe5b 100644 --- a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java @@ -264,7 +264,8 @@ public EngineConfig copy(EngineConfig config, LongSupplier globalCheckpointSuppl globalCheckpointSupplier, config.retentionLeasesSupplier(), config.getPrimaryTermSupplier(), - tombstoneDocSupplier() + tombstoneDocSupplier(), + false ); } @@ -291,7 +292,8 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) { config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), config.getPrimaryTermSupplier(), - config.getTombstoneDocSupplier() + config.getTombstoneDocSupplier(), + false ); } @@ -318,7 +320,8 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), config.getPrimaryTermSupplier(), - config.getTombstoneDocSupplier() + config.getTombstoneDocSupplier(), + false ); } @@ -328,24 +331,26 @@ public void tearDown() throws Exception { super.tearDown(); try { if (engine != null && engine.isClosed.get() == false) { - engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); - assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine); - assertNoInFlightDocuments(engine); - assertMaxSeqNoInCommitUserData(engine); - assertAtMostOneLuceneDocumentPerSequenceNumber(engine); + assertEngineCleanedUp(engine, engine.getTranslog()); } if (replicaEngine != null && replicaEngine.isClosed.get() == false) { - replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); - assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine); - assertNoInFlightDocuments(replicaEngine); - assertMaxSeqNoInCommitUserData(replicaEngine); - assertAtMostOneLuceneDocumentPerSequenceNumber(replicaEngine); + assertEngineCleanedUp(replicaEngine, replicaEngine.getTranslog()); } } finally { IOUtils.close(replicaEngine, storeReplica, engine, store, () -> terminate(threadPool)); } } + protected void assertEngineCleanedUp(Engine engine, Translog translog) throws Exception { + if (engine.isClosed.get() == false) { + translog.getDeletionPolicy().assertNoOpenTranslogRefs(); + assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine); + assertNoInFlightDocuments(engine); + assertMaxSeqNoInCommitUserData(engine); + assertAtMostOneLuceneDocumentPerSequenceNumber(engine); + } + } + protected static ParseContext.Document testDocumentWithTextField() { return testDocumentWithTextField("test"); } @@ -871,7 +876,8 @@ public EngineConfig config( globalCheckpointSupplier, retentionLeasesSupplier, primaryTerm, - tombstoneDocSupplier() + tombstoneDocSupplier(), + false ); } @@ -911,7 +917,8 @@ protected EngineConfig config( config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), config.getPrimaryTermSupplier(), - tombstoneDocSupplier + tombstoneDocSupplier, + false ); }