From cd037542eca3515a4837c4d33cfb7a7674556c1e Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Mon, 28 Feb 2022 07:24:14 -0800 Subject: [PATCH 1/4] toggle Signed-off-by: Poojita Raj index writer null exception Signed-off-by: Poojita Raj included conditional logic Signed-off-by: Poojita Raj internal listener fix Signed-off-by: Poojita Raj --- .../org/opensearch/index/IndexService.java | 4 +- .../org/opensearch/index/IndexSettings.java | 6 +- .../index/engine/InternalEngine.java | 29 +++-- .../index/engine/OpenSearchReaderManager.java | 9 +- .../opensearch/index/shard/IndexShard.java | 112 ++++++++++-------- 5 files changed, 86 insertions(+), 74 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index c1d3f4ddab147..8f588e615e34d 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -915,9 +915,7 @@ private void maybeRefreshEngine(boolean force) { if (indexSettings.getRefreshInterval().millis() > 0 || force) { for (IndexShard shard : this.shards.values()) { try { - if (shard.routingEntry().primary()) { - shard.scheduledRefresh(); - } + shard.scheduledRefresh(); } catch (IndexShardClosedException | AlreadyClosedException ex) { // fine - continue; } diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 914531e2e5acd..ae27768caa68d 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -653,6 +653,10 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti this.indexMetadata = indexMetadata; numberOfShards = settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, null); isSegrepEnabled = settings.getAsBoolean(IndexMetadata.SETTING_SEGMENT_REPLICATION, false); +<<<<<<< HEAD +======= + +>>>>>>> 5950974c5a2 (toggle) this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings); this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings); this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings); @@ -883,7 +887,7 @@ public int getNumberOfReplicas() { } public boolean isSegrepEnabled() { - return isSegrepEnabled; + return Boolean.TRUE.equals(isSegrepEnabled); } /** diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 3b69a8a25753e..48746e34f5f63 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -277,17 +277,17 @@ public InternalEngine(EngineConfig engineConfig) { ); this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier); // TODO: Segrep - should have a separate read only engine rather than all this conditional logic. - if (engineConfig.isPrimary()) { - writer = createWriter(); - bootstrapAppendOnlyInfoFromWriter(writer); - final Map commitData = commitDataAsMap(writer); - historyUUID = loadHistoryUUID(commitData); - forceMergeUUID = commitData.get(FORCE_MERGE_UUID_KEY); - } else { + if ((engineConfig.getIndexSettings().isSegrepEnabled()) && engineConfig.isPrimary() == false) { // Segrep - hack to make this engine read only and not use writer = null; historyUUID = null; forceMergeUUID = null; + } else { + writer = createWriter(); + bootstrapAppendOnlyInfoFromWriter(writer); + final Map commitData = commitDataAsMap(writer); + historyUUID = loadHistoryUUID(commitData); + forceMergeUUID = commitData.get(FORCE_MERGE_UUID_KEY); } indexWriter = writer; } catch (IOException | TranslogCorruptedException e) { @@ -582,7 +582,8 @@ private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecovery translog.currentFileGeneration() ) ); - if (engineConfig.isPrimary()) { + boolean segrep = engineConfig.getIndexSettings().isSegrepEnabled(); + if ((segrep == false) || (segrep && engineConfig.isPrimary())) { flush(false, true); } translog.trimUnreferencedReaders(); @@ -652,7 +653,8 @@ public Translog.Location getTranslogLastWriteLocation() { private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException { if (combinedDeletionPolicy.hasUnreferencedCommits()) { - if (engineConfig.isPrimary()) { + boolean segrep = engineConfig.getIndexSettings().isSegrepEnabled(); + if ((segrep == false) || (segrep && engineConfig.isPrimary())) { indexWriter.deleteUnusedFiles(); } } @@ -715,8 +717,8 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external } private DirectoryReader getDirectoryReader() throws IOException { - // replicas should create the reader from store, we don't want an open IW on replicas. - if (engineConfig.isPrimary() == false) { + // for segment replication: replicas should create the reader from store, we don't want an open IW on replicas. + if (engineConfig.getIndexSettings().isSegrepEnabled() && engineConfig.isPrimary() == false) { return DirectoryReader.open(store.directory()); } return DirectoryReader.open(indexWriter); @@ -1980,7 +1982,7 @@ public boolean shouldPeriodicallyFlush() { @Override public void flush(boolean force, boolean waitIfOngoing) throws EngineException { - if (engineConfig.isPrimary() == false) { + if ((engineConfig.getIndexSettings().isSegrepEnabled()) && (engineConfig.isPrimary() == false)) { return; } ensureOpen(); @@ -2440,7 +2442,8 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) { // no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed logger.trace("rollback indexWriter"); try { - if (engineConfig.isPrimary()) { + boolean segrep = engineConfig.getIndexSettings().isSegrepEnabled(); + if (segrep == false || (segrep && engineConfig.isPrimary())) { indexWriter.rollback(); } } catch (AlreadyClosedException ex) { diff --git a/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java b/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java index da2365a94aa8a..a0f9f9c5d7a5d 100644 --- a/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java +++ b/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java @@ -32,10 +32,6 @@ package org.opensearch.index.engine; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.index.DirectoryReader; @@ -44,11 +40,14 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.StandardDirectoryReader; import org.apache.lucene.search.ReferenceManager; - import org.apache.lucene.search.SearcherManager; import org.opensearch.common.SuppressForbidden; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + /** * Utility class to safely share {@link OpenSearchDirectoryReader} instances across * multiple threads, while periodically reopening. This class ensures each diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index d4753e7763e6e..886f31457bbf4 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -845,8 +845,7 @@ public Engine.IndexResult applyIndexOperationOnReplica( boolean isRetry, SourceToParse sourceToParse ) throws IOException { - Boolean isSegRepEnabled = indexSettings.isSegrepEnabled(); - if (isSegRepEnabled != null && isSegRepEnabled) { + if (indexSettings.isSegrepEnabled()) { Engine.Index index; try { index = parseSourceAndPrepareIndex( @@ -3073,32 +3072,35 @@ public void startRecovery( case PEER: try { markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState); - IndexShard indexShard = this; - segmentReplicationReplicaService.prepareForReplication( - this, - recoveryState.getTargetNode(), - recoveryState.getSourceNode(), - new ActionListener() { - @Override - public void onResponse(TrackShardResponse unused) { - replicationListener.onReplicationDone(replicationState); - recoveryState.getIndex().setFileDetailsComplete(); - finalizeRecovery(); - postRecovery("Shard setup complete."); - } + if (indexSettings.isSegrepEnabled()) { + IndexShard indexShard = this; + segmentReplicationReplicaService.prepareForReplication( + this, + recoveryState.getTargetNode(), + recoveryState.getSourceNode(), + new ActionListener() { + @Override + public void onResponse(TrackShardResponse unused) { + replicationListener.onReplicationDone(replicationState); + recoveryState.getIndex().setFileDetailsComplete(); + finalizeRecovery(); + postRecovery("Shard setup complete."); + } - @Override - public void onFailure(Exception e) { - replicationListener.onReplicationFailure( - replicationState, - new ReplicationFailedException(indexShard, e), - true - ); + @Override + public void onFailure(Exception e) { + replicationListener.onReplicationFailure( + replicationState, + new ReplicationFailedException(indexShard, e), + true + ); + } } - } - ); + ); + } else { + peerRecoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener); + } } catch (Exception e) { - logger.error("Error preparing the shard for Segment replication", e); failShard("corrupted preexisting index", e); recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true); } @@ -3295,31 +3297,37 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { this.warmer.warm(reader); } }; - return this.engineConfigFactory.newEngineConfig( - shardId, - threadPool, - indexSettings, - warmer, - store, - indexSettings.getMergePolicy(), - mapperService != null ? mapperService.indexAnalyzer() : null, - similarityService.similarity(mapperService), - codecService, - shardEventListener, - indexCache != null ? indexCache.query() : null, - cachingPolicy, - translogConfig, - IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), - Arrays.asList(refreshListeners, refreshPendingLocationListener), - Arrays.asList(new RefreshMetricUpdater(refreshMetric), checkpointRefreshListener), - indexSort, - circuitBreakerService, - globalCheckpointSupplier, - replicationTracker::getRetentionLeases, - () -> getOperationPrimaryTerm(), - tombstoneDocSupplier(), - shardRouting.primary() - ); + final List internalRefreshListener; + if (indexSettings.isSegrepEnabled()) { + internalRefreshListener = Arrays.asList(new RefreshMetricUpdater(refreshMetric), checkpointRefreshListener); + } else { + internalRefreshListener = Collections.singletonList(new RefreshMetricUpdater(refreshMetric)); + } + return this.engineConfigFactory.newEngineConfig( + shardId, + threadPool, + indexSettings, + warmer, + store, + indexSettings.getMergePolicy(), + mapperService != null ? mapperService.indexAnalyzer() : null, + similarityService.similarity(mapperService), + codecService, + shardEventListener, + indexCache != null ? indexCache.query() : null, + cachingPolicy, + translogConfig, + IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), + Arrays.asList(refreshListeners, refreshPendingLocationListener), + internalRefreshListener, + indexSort, + circuitBreakerService, + globalCheckpointSupplier, + replicationTracker::getRetentionLeases, + () -> getOperationPrimaryTerm(), + tombstoneDocSupplier(), + shardRouting.primary() + ); } /** @@ -3910,8 +3918,8 @@ ReplicationTracker getReplicationTracker() { public boolean scheduledRefresh() { // skip if not primary shard. // TODO: Segrep - should split into primary/replica classes. - if (shardRouting.primary() == false) { - return false; + if ((indexSettings.isSegrepEnabled()) &&(shardRouting.primary() == false) ) { + return false; } verifyNotClosed(); boolean listenerNeedsRefresh = refreshListeners.refreshNeeded(); From 56ef936674972baab284302bc2c25dd73added70 Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Mon, 7 Mar 2022 09:44:36 -0800 Subject: [PATCH 2/4] docrep working Signed-off-by: Poojita Raj --- .../index/engine/InternalEngine.java | 10 +-- .../opensearch/index/shard/IndexShard.java | 70 ++++++++++--------- .../indices/recovery/MultiFileWriter.java | 5 +- 3 files changed, 47 insertions(+), 38 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 48746e34f5f63..6de79a8e91057 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -283,11 +283,11 @@ public InternalEngine(EngineConfig engineConfig) { historyUUID = null; forceMergeUUID = null; } else { - writer = createWriter(); - bootstrapAppendOnlyInfoFromWriter(writer); - final Map commitData = commitDataAsMap(writer); - historyUUID = loadHistoryUUID(commitData); - forceMergeUUID = commitData.get(FORCE_MERGE_UUID_KEY); + writer = createWriter(); + bootstrapAppendOnlyInfoFromWriter(writer); + final Map commitData = commitDataAsMap(writer); + historyUUID = loadHistoryUUID(commitData); + forceMergeUUID = commitData.get(FORCE_MERGE_UUID_KEY); } indexWriter = writer; } catch (IOException | TranslogCorruptedException e) { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 886f31457bbf4..5d6e98b90fa2d 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1277,7 +1277,7 @@ public void refresh(String source) { */ public long getWritingBytes() { // TODO: Segrep: hack - if not the primary our IW is null and this blows up. - if (shardRouting.primary() == false) { + if (indexSettings.isSegrepEnabled() && (shardRouting.primary() == false)) { return 0L; } Engine engine = getEngineOrNull(); @@ -2029,7 +2029,9 @@ public void openEngineAndRecoverFromTranslog() throws IOException { public void openEngineAndSkipTranslogRecovery() throws IOException { assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; // TODO: Segrep - fix initial recovery stages from ReplicationTarget. - // recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG); + if (indexSettings.isSegrepEnabled() == false) { + recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG); + } loadGlobalCheckpointToReplicationTracker(); innerOpenEngineAndTranslog(replicationTracker); getEngine().skipTranslogRecovery(); @@ -2066,7 +2068,9 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. onSettingsChanged(); // TODO: Segrep - Fix - // assert assertSequenceNumbersInCommit(); + if (indexSettings.isSegrepEnabled() == false) { + assert assertSequenceNumbersInCommit(); + } recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG); } @@ -2238,7 +2242,7 @@ protected final void verifyActive() throws IllegalIndexShardStateException { * Returns number of heap bytes used by the indexing buffer for this shard, or 0 if the shard is closed */ public long getIndexBufferRAMBytesUsed() { - if (shardRouting.primary() == false) { + if (indexSettings.isSegrepEnabled() && (shardRouting.primary() == false)) { return 0; } Engine engine = getEngineOrNull(); @@ -2714,7 +2718,9 @@ public void syncRetentionLeases() { assert assertPrimaryMode(); verifyNotClosed(); // TODO: Segrep - Fix retention leases - // replicationTracker.renewPeerRecoveryRetentionLeases(); + if (indexSettings.isSegrepEnabled() == false) { + replicationTracker.renewPeerRecoveryRetentionLeases(); + } final Tuple retentionLeases = getRetentionLeases(true); if (retentionLeases.v1()) { logger.trace("syncing retention leases [{}] after expiration check", retentionLeases.v2()); @@ -3303,31 +3309,31 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { } else { internalRefreshListener = Collections.singletonList(new RefreshMetricUpdater(refreshMetric)); } - return this.engineConfigFactory.newEngineConfig( - shardId, - threadPool, - indexSettings, - warmer, - store, - indexSettings.getMergePolicy(), - mapperService != null ? mapperService.indexAnalyzer() : null, - similarityService.similarity(mapperService), - codecService, - shardEventListener, - indexCache != null ? indexCache.query() : null, - cachingPolicy, - translogConfig, - IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), - Arrays.asList(refreshListeners, refreshPendingLocationListener), - internalRefreshListener, - indexSort, - circuitBreakerService, - globalCheckpointSupplier, - replicationTracker::getRetentionLeases, - () -> getOperationPrimaryTerm(), - tombstoneDocSupplier(), - shardRouting.primary() - ); + return this.engineConfigFactory.newEngineConfig( + shardId, + threadPool, + indexSettings, + warmer, + store, + indexSettings.getMergePolicy(), + mapperService != null ? mapperService.indexAnalyzer() : null, + similarityService.similarity(mapperService), + codecService, + shardEventListener, + indexCache != null ? indexCache.query() : null, + cachingPolicy, + translogConfig, + IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), + Arrays.asList(refreshListeners, refreshPendingLocationListener), + internalRefreshListener, + indexSort, + circuitBreakerService, + globalCheckpointSupplier, + replicationTracker::getRetentionLeases, + () -> getOperationPrimaryTerm(), + tombstoneDocSupplier(), + shardRouting.primary() + ); } /** @@ -3918,8 +3924,8 @@ ReplicationTracker getReplicationTracker() { public boolean scheduledRefresh() { // skip if not primary shard. // TODO: Segrep - should split into primary/replica classes. - if ((indexSettings.isSegrepEnabled()) &&(shardRouting.primary() == false) ) { - return false; + if ((indexSettings.isSegrepEnabled()) && (shardRouting.primary() == false)) { + return false; } verifyNotClosed(); boolean listenerNeedsRefresh = refreshListeners.refreshNeeded(); diff --git a/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java b/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java index fe35711547fce..e88d123f50679 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java +++ b/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java @@ -48,6 +48,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.Iterator; import java.util.Map; @@ -151,7 +152,9 @@ private void innerWriteFileChunk(StoreFileMetadata fileMetadata, long position, + Arrays.toString(store.directory().listAll()); // TODO: Segrep - toggle this with a setting. With segrep we don't want this fsync we will only fsync // when a new checkpoint is received. - // store.directory().sync(Collections.singleton(temporaryFileName)); + if (store.indexSettings().isSegrepEnabled() == false) { + store.directory().sync(Collections.singleton(temporaryFileName)); + } IndexOutput remove = removeOpenIndexOutputs(name); assert remove == null || remove == indexOutput; // remove maybe null if we got finished } From e8437bd51807514369a2ee9c2801c4bb0958975b Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Mon, 7 Mar 2022 15:05:25 -0800 Subject: [PATCH 3/4] refactor isPrimary logic to isReadOnly Signed-off-by: Poojita Raj --- .../indices/IndexingMemoryControllerIT.java | 2 +- .../org/opensearch/index/IndexSettings.java | 4 ---- .../opensearch/index/engine/EngineConfig.java | 14 +++++------ .../index/engine/InternalEngine.java | 23 ++++++++++--------- .../opensearch/index/shard/IndexShard.java | 8 ++----- .../index/engine/InternalEngineTests.java | 6 ++--- .../index/shard/IndexShardTests.java | 2 +- .../IndexingMemoryControllerTests.java | 2 +- .../index/engine/EngineTestCase.java | 8 +++---- 9 files changed, 31 insertions(+), 38 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndexingMemoryControllerIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndexingMemoryControllerIT.java index 9ca84ee55e95e..df96b3ee08fb3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndexingMemoryControllerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndexingMemoryControllerIT.java @@ -104,7 +104,7 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) { config.retentionLeasesSupplier(), config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier(), - config.isPrimary() + config.isReadOnly() ); } diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index ae27768caa68d..481fc414e12e3 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -653,10 +653,6 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti this.indexMetadata = indexMetadata; numberOfShards = settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, null); isSegrepEnabled = settings.getAsBoolean(IndexMetadata.SETTING_SEGMENT_REPLICATION, false); -<<<<<<< HEAD -======= - ->>>>>>> 5950974c5a2 (toggle) this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings); this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings); this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings); diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java index 3a9d91a1906fe..49b65dab29565 100644 --- a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java @@ -95,7 +95,7 @@ public final class EngineConfig { private final CircuitBreakerService circuitBreakerService; private final LongSupplier globalCheckpointSupplier; private final Supplier retentionLeasesSupplier; - private boolean isPrimary; + private boolean isReadOnly; /** * A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been @@ -171,7 +171,7 @@ public EngineConfig( Supplier retentionLeasesSupplier, LongSupplier primaryTermSupplier, TombstoneDocSupplier tombstoneDocSupplier, - boolean isPrimary + boolean isReadOnly ) { this( shardId, @@ -195,7 +195,7 @@ public EngineConfig( circuitBreakerService, globalCheckpointSupplier, retentionLeasesSupplier, - isPrimary, + isReadOnly, primaryTermSupplier, tombstoneDocSupplier ); @@ -226,7 +226,7 @@ public EngineConfig( CircuitBreakerService circuitBreakerService, LongSupplier globalCheckpointSupplier, Supplier retentionLeasesSupplier, - boolean isPrimary, + boolean isReadOnly, LongSupplier primaryTermSupplier, TombstoneDocSupplier tombstoneDocSupplier ) { @@ -241,7 +241,7 @@ public EngineConfig( this.codecService = codecService; this.eventListener = eventListener; codecName = indexSettings.getValue(INDEX_CODEC_SETTING); - this.isPrimary = isPrimary; + this.isReadOnly = isReadOnly; // We need to make the indexing buffer for this shard at least as large // as the amount of memory that is available for all engines on the // local node so that decisions to flush segments to disk are made by @@ -463,8 +463,8 @@ public LongSupplier getPrimaryTermSupplier() { return primaryTermSupplier; } - public boolean isPrimary() { - return isPrimary; + public boolean isReadOnly() { + return isReadOnly; } /** diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 6de79a8e91057..f279f2296166b 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -221,6 +221,10 @@ public class InternalEngine extends Engine { @Nullable private volatile String forceMergeUUID; + private boolean isReadOnlyReplica() { + return engineConfig.getIndexSettings().isSegrepEnabled() && engineConfig.isReadOnly(); + } + public InternalEngine(EngineConfig engineConfig) { this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new); } @@ -277,7 +281,7 @@ public InternalEngine(EngineConfig engineConfig) { ); this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier); // TODO: Segrep - should have a separate read only engine rather than all this conditional logic. - if ((engineConfig.getIndexSettings().isSegrepEnabled()) && engineConfig.isPrimary() == false) { + if (isReadOnlyReplica()) { // Segrep - hack to make this engine read only and not use writer = null; historyUUID = null; @@ -346,7 +350,7 @@ public InternalEngine(EngineConfig engineConfig) { @Override public void updateCurrentInfos(byte[] infosBytes, long gen, long seqNo) throws IOException { - assert engineConfig.isPrimary() == false : "Only replicas should update Infos"; + assert engineConfig.isReadOnly() == true : "Only replicas should update Infos"; SegmentInfos infos = SegmentInfos.readCommit(this.store.directory(), toIndexInput(infosBytes), gen); assert gen == infos.getGeneration(); externalReaderManager.internalReaderManager.updateSegments(infos); @@ -582,8 +586,7 @@ private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecovery translog.currentFileGeneration() ) ); - boolean segrep = engineConfig.getIndexSettings().isSegrepEnabled(); - if ((segrep == false) || (segrep && engineConfig.isPrimary())) { + if (isReadOnlyReplica() == false) { flush(false, true); } translog.trimUnreferencedReaders(); @@ -653,8 +656,7 @@ public Translog.Location getTranslogLastWriteLocation() { private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException { if (combinedDeletionPolicy.hasUnreferencedCommits()) { - boolean segrep = engineConfig.getIndexSettings().isSegrepEnabled(); - if ((segrep == false) || (segrep && engineConfig.isPrimary())) { + if (isReadOnlyReplica() == false) { indexWriter.deleteUnusedFiles(); } } @@ -718,7 +720,7 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external private DirectoryReader getDirectoryReader() throws IOException { // for segment replication: replicas should create the reader from store, we don't want an open IW on replicas. - if (engineConfig.getIndexSettings().isSegrepEnabled() && engineConfig.isPrimary() == false) { + if (isReadOnlyReplica()) { return DirectoryReader.open(store.directory()); } return DirectoryReader.open(indexWriter); @@ -1982,7 +1984,7 @@ public boolean shouldPeriodicallyFlush() { @Override public void flush(boolean force, boolean waitIfOngoing) throws EngineException { - if ((engineConfig.getIndexSettings().isSegrepEnabled()) && (engineConfig.isPrimary() == false)) { + if (isReadOnlyReplica()) { return; } ensureOpen(); @@ -2275,7 +2277,7 @@ public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws En @Override public SegmentInfosRef getLatestSegmentInfosSafe() { - assert (engineConfig.isPrimary()); + assert (engineConfig.isReadOnly() == false); final SegmentInfos segmentInfos = getLatestSegmentInfos(); try { indexWriter.incRefDeleter(segmentInfos); @@ -2442,8 +2444,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) { // no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed logger.trace("rollback indexWriter"); try { - boolean segrep = engineConfig.getIndexSettings().isSegrepEnabled(); - if (segrep == false || (segrep && engineConfig.isPrimary())) { + if (isReadOnlyReplica() == false) { indexWriter.rollback(); } } catch (AlreadyClosedException ex) { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 5d6e98b90fa2d..be13aa8273dae 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2068,9 +2068,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. onSettingsChanged(); // TODO: Segrep - Fix - if (indexSettings.isSegrepEnabled() == false) { - assert assertSequenceNumbersInCommit(); - } + assert assertSequenceNumbersInCommit(); recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG); } @@ -2718,9 +2716,7 @@ public void syncRetentionLeases() { assert assertPrimaryMode(); verifyNotClosed(); // TODO: Segrep - Fix retention leases - if (indexSettings.isSegrepEnabled() == false) { - replicationTracker.renewPeerRecoveryRetentionLeases(); - } + replicationTracker.renewPeerRecoveryRetentionLeases(); final Tuple retentionLeases = getRetentionLeases(true); if (retentionLeases.v1()) { logger.trace("syncing retention leases [{}] after expiration check", retentionLeases.v2()); diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index cb8cc7ef6ab03..0725445a39865 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -3440,7 +3440,7 @@ public void testRecoverFromForeignTranslog() throws IOException { () -> RetentionLeases.EMPTY, primaryTerm::get, tombstoneDocSupplier(), - config.isPrimary() + config.isReadOnly() ); expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig)); @@ -3483,7 +3483,7 @@ public CustomTranslogDeletionPolicy(IndexSettings indexSettings, Supplier Date: Tue, 8 Mar 2022 12:35:15 -0800 Subject: [PATCH 4/4] Update assert exception message --- .../main/java/org/opensearch/index/engine/InternalEngine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index f279f2296166b..a5ec9b437e00d 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -350,7 +350,7 @@ public InternalEngine(EngineConfig engineConfig) { @Override public void updateCurrentInfos(byte[] infosBytes, long gen, long seqNo) throws IOException { - assert engineConfig.isReadOnly() == true : "Only replicas should update Infos"; + assert engineConfig.isReadOnly() == true : "Only read-only replicas should update Infos"; SegmentInfos infos = SegmentInfos.readCommit(this.store.directory(), toIndexInput(infosBytes), gen); assert gen == infos.getGeneration(); externalReaderManager.internalReaderManager.updateSegments(infos);