diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndexingMemoryControllerIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndexingMemoryControllerIT.java index 9ca84ee55e95e..df96b3ee08fb3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndexingMemoryControllerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndexingMemoryControllerIT.java @@ -104,7 +104,7 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) { config.retentionLeasesSupplier(), config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier(), - config.isPrimary() + config.isReadOnly() ); } diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index ae27768caa68d..481fc414e12e3 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -653,10 +653,6 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti this.indexMetadata = indexMetadata; numberOfShards = settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, null); isSegrepEnabled = settings.getAsBoolean(IndexMetadata.SETTING_SEGMENT_REPLICATION, false); -<<<<<<< HEAD -======= - ->>>>>>> 5950974c5a2 (toggle) this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings); this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings); this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings); diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java index 3a9d91a1906fe..49b65dab29565 100644 --- a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java @@ -95,7 +95,7 @@ public final class EngineConfig { private final CircuitBreakerService circuitBreakerService; private final LongSupplier globalCheckpointSupplier; private final Supplier retentionLeasesSupplier; - private boolean isPrimary; + private boolean isReadOnly; /** * A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been @@ -171,7 +171,7 @@ public EngineConfig( Supplier retentionLeasesSupplier, LongSupplier primaryTermSupplier, TombstoneDocSupplier tombstoneDocSupplier, - boolean isPrimary + boolean isReadOnly ) { this( shardId, @@ -195,7 +195,7 @@ public EngineConfig( circuitBreakerService, globalCheckpointSupplier, retentionLeasesSupplier, - isPrimary, + isReadOnly, primaryTermSupplier, tombstoneDocSupplier ); @@ -226,7 +226,7 @@ public EngineConfig( CircuitBreakerService circuitBreakerService, LongSupplier globalCheckpointSupplier, Supplier retentionLeasesSupplier, - boolean isPrimary, + boolean isReadOnly, LongSupplier primaryTermSupplier, TombstoneDocSupplier tombstoneDocSupplier ) { @@ -241,7 +241,7 @@ public EngineConfig( this.codecService = codecService; this.eventListener = eventListener; codecName = indexSettings.getValue(INDEX_CODEC_SETTING); - this.isPrimary = isPrimary; + this.isReadOnly = isReadOnly; // We need to make the indexing buffer for this shard at least as large // as the amount of memory that is available for all engines on the // local node so that decisions to flush segments to disk are made by @@ -463,8 +463,8 @@ public LongSupplier getPrimaryTermSupplier() { return primaryTermSupplier; } - public boolean isPrimary() { - return isPrimary; + public boolean isReadOnly() { + return isReadOnly; } /** diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 6de79a8e91057..f279f2296166b 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -221,6 +221,10 @@ public class InternalEngine extends Engine { @Nullable private volatile String forceMergeUUID; + private boolean isReadOnlyReplica() { + return engineConfig.getIndexSettings().isSegrepEnabled() && engineConfig.isReadOnly(); + } + public InternalEngine(EngineConfig engineConfig) { this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new); } @@ -277,7 +281,7 @@ public InternalEngine(EngineConfig engineConfig) { ); this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier); // TODO: Segrep - should have a separate read only engine rather than all this conditional logic. - if ((engineConfig.getIndexSettings().isSegrepEnabled()) && engineConfig.isPrimary() == false) { + if (isReadOnlyReplica()) { // Segrep - hack to make this engine read only and not use writer = null; historyUUID = null; @@ -346,7 +350,7 @@ public InternalEngine(EngineConfig engineConfig) { @Override public void updateCurrentInfos(byte[] infosBytes, long gen, long seqNo) throws IOException { - assert engineConfig.isPrimary() == false : "Only replicas should update Infos"; + assert engineConfig.isReadOnly() == true : "Only replicas should update Infos"; SegmentInfos infos = SegmentInfos.readCommit(this.store.directory(), toIndexInput(infosBytes), gen); assert gen == infos.getGeneration(); externalReaderManager.internalReaderManager.updateSegments(infos); @@ -582,8 +586,7 @@ private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecovery translog.currentFileGeneration() ) ); - boolean segrep = engineConfig.getIndexSettings().isSegrepEnabled(); - if ((segrep == false) || (segrep && engineConfig.isPrimary())) { + if (isReadOnlyReplica() == false) { flush(false, true); } translog.trimUnreferencedReaders(); @@ -653,8 +656,7 @@ public Translog.Location getTranslogLastWriteLocation() { private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException { if (combinedDeletionPolicy.hasUnreferencedCommits()) { - boolean segrep = engineConfig.getIndexSettings().isSegrepEnabled(); - if ((segrep == false) || (segrep && engineConfig.isPrimary())) { + if (isReadOnlyReplica() == false) { indexWriter.deleteUnusedFiles(); } } @@ -718,7 +720,7 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external private DirectoryReader getDirectoryReader() throws IOException { // for segment replication: replicas should create the reader from store, we don't want an open IW on replicas. - if (engineConfig.getIndexSettings().isSegrepEnabled() && engineConfig.isPrimary() == false) { + if (isReadOnlyReplica()) { return DirectoryReader.open(store.directory()); } return DirectoryReader.open(indexWriter); @@ -1982,7 +1984,7 @@ public boolean shouldPeriodicallyFlush() { @Override public void flush(boolean force, boolean waitIfOngoing) throws EngineException { - if ((engineConfig.getIndexSettings().isSegrepEnabled()) && (engineConfig.isPrimary() == false)) { + if (isReadOnlyReplica()) { return; } ensureOpen(); @@ -2275,7 +2277,7 @@ public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws En @Override public SegmentInfosRef getLatestSegmentInfosSafe() { - assert (engineConfig.isPrimary()); + assert (engineConfig.isReadOnly() == false); final SegmentInfos segmentInfos = getLatestSegmentInfos(); try { indexWriter.incRefDeleter(segmentInfos); @@ -2442,8 +2444,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) { // no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed logger.trace("rollback indexWriter"); try { - boolean segrep = engineConfig.getIndexSettings().isSegrepEnabled(); - if (segrep == false || (segrep && engineConfig.isPrimary())) { + if (isReadOnlyReplica() == false) { indexWriter.rollback(); } } catch (AlreadyClosedException ex) { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 5d6e98b90fa2d..be13aa8273dae 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2068,9 +2068,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. onSettingsChanged(); // TODO: Segrep - Fix - if (indexSettings.isSegrepEnabled() == false) { - assert assertSequenceNumbersInCommit(); - } + assert assertSequenceNumbersInCommit(); recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG); } @@ -2718,9 +2716,7 @@ public void syncRetentionLeases() { assert assertPrimaryMode(); verifyNotClosed(); // TODO: Segrep - Fix retention leases - if (indexSettings.isSegrepEnabled() == false) { - replicationTracker.renewPeerRecoveryRetentionLeases(); - } + replicationTracker.renewPeerRecoveryRetentionLeases(); final Tuple retentionLeases = getRetentionLeases(true); if (retentionLeases.v1()) { logger.trace("syncing retention leases [{}] after expiration check", retentionLeases.v2()); diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index cb8cc7ef6ab03..0725445a39865 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -3440,7 +3440,7 @@ public void testRecoverFromForeignTranslog() throws IOException { () -> RetentionLeases.EMPTY, primaryTerm::get, tombstoneDocSupplier(), - config.isPrimary() + config.isReadOnly() ); expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig)); @@ -3483,7 +3483,7 @@ public CustomTranslogDeletionPolicy(IndexSettings indexSettings, Supplier