diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndexingMemoryControllerIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndexingMemoryControllerIT.java index 9ca84ee55e95e..df96b3ee08fb3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndexingMemoryControllerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndexingMemoryControllerIT.java @@ -104,7 +104,7 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) { config.retentionLeasesSupplier(), config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier(), - config.isPrimary() + config.isReadOnly() ); } diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index c1d3f4ddab147..8f588e615e34d 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -915,9 +915,7 @@ private void maybeRefreshEngine(boolean force) { if (indexSettings.getRefreshInterval().millis() > 0 || force) { for (IndexShard shard : this.shards.values()) { try { - if (shard.routingEntry().primary()) { - shard.scheduledRefresh(); - } + shard.scheduledRefresh(); } catch (IndexShardClosedException | AlreadyClosedException ex) { // fine - continue; } diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 914531e2e5acd..481fc414e12e3 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -883,7 +883,7 @@ public int getNumberOfReplicas() { } public boolean isSegrepEnabled() { - return isSegrepEnabled; + return Boolean.TRUE.equals(isSegrepEnabled); } /** diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java index 3a9d91a1906fe..49b65dab29565 100644 --- a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java @@ -95,7 +95,7 @@ public final class EngineConfig { private final CircuitBreakerService circuitBreakerService; private final LongSupplier globalCheckpointSupplier; private final Supplier retentionLeasesSupplier; - private boolean isPrimary; + private boolean isReadOnly; /** * A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been @@ -171,7 +171,7 @@ public EngineConfig( Supplier retentionLeasesSupplier, LongSupplier primaryTermSupplier, TombstoneDocSupplier tombstoneDocSupplier, - boolean isPrimary + boolean isReadOnly ) { this( shardId, @@ -195,7 +195,7 @@ public EngineConfig( circuitBreakerService, globalCheckpointSupplier, retentionLeasesSupplier, - isPrimary, + isReadOnly, primaryTermSupplier, tombstoneDocSupplier ); @@ -226,7 +226,7 @@ public EngineConfig( CircuitBreakerService circuitBreakerService, LongSupplier globalCheckpointSupplier, Supplier retentionLeasesSupplier, - boolean isPrimary, + boolean isReadOnly, LongSupplier primaryTermSupplier, TombstoneDocSupplier tombstoneDocSupplier ) { @@ -241,7 +241,7 @@ public EngineConfig( this.codecService = codecService; this.eventListener = eventListener; codecName = indexSettings.getValue(INDEX_CODEC_SETTING); - this.isPrimary = isPrimary; + this.isReadOnly = isReadOnly; // We need to make the indexing buffer for this shard at least as large // as the amount of memory that is available for all engines on the // local node so that decisions to flush segments to disk are made by @@ -463,8 +463,8 @@ public LongSupplier getPrimaryTermSupplier() { return primaryTermSupplier; } - public boolean isPrimary() { - return isPrimary; + public boolean isReadOnly() { + return isReadOnly; } /** diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 3b69a8a25753e..a5ec9b437e00d 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -221,6 +221,10 @@ public class InternalEngine extends Engine { @Nullable private volatile String forceMergeUUID; + private boolean isReadOnlyReplica() { + return engineConfig.getIndexSettings().isSegrepEnabled() && engineConfig.isReadOnly(); + } + public InternalEngine(EngineConfig engineConfig) { this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new); } @@ -277,17 +281,17 @@ public InternalEngine(EngineConfig engineConfig) { ); this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier); // TODO: Segrep - should have a separate read only engine rather than all this conditional logic. - if (engineConfig.isPrimary()) { + if (isReadOnlyReplica()) { + // Segrep - hack to make this engine read only and not use + writer = null; + historyUUID = null; + forceMergeUUID = null; + } else { writer = createWriter(); bootstrapAppendOnlyInfoFromWriter(writer); final Map commitData = commitDataAsMap(writer); historyUUID = loadHistoryUUID(commitData); forceMergeUUID = commitData.get(FORCE_MERGE_UUID_KEY); - } else { - // Segrep - hack to make this engine read only and not use - writer = null; - historyUUID = null; - forceMergeUUID = null; } indexWriter = writer; } catch (IOException | TranslogCorruptedException e) { @@ -346,7 +350,7 @@ public InternalEngine(EngineConfig engineConfig) { @Override public void updateCurrentInfos(byte[] infosBytes, long gen, long seqNo) throws IOException { - assert engineConfig.isPrimary() == false : "Only replicas should update Infos"; + assert engineConfig.isReadOnly() == true : "Only read-only replicas should update Infos"; SegmentInfos infos = SegmentInfos.readCommit(this.store.directory(), toIndexInput(infosBytes), gen); assert gen == infos.getGeneration(); externalReaderManager.internalReaderManager.updateSegments(infos); @@ -582,7 +586,7 @@ private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecovery translog.currentFileGeneration() ) ); - if (engineConfig.isPrimary()) { + if (isReadOnlyReplica() == false) { flush(false, true); } translog.trimUnreferencedReaders(); @@ -652,7 +656,7 @@ public Translog.Location getTranslogLastWriteLocation() { private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException { if (combinedDeletionPolicy.hasUnreferencedCommits()) { - if (engineConfig.isPrimary()) { + if (isReadOnlyReplica() == false) { indexWriter.deleteUnusedFiles(); } } @@ -715,8 +719,8 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external } private DirectoryReader getDirectoryReader() throws IOException { - // replicas should create the reader from store, we don't want an open IW on replicas. - if (engineConfig.isPrimary() == false) { + // for segment replication: replicas should create the reader from store, we don't want an open IW on replicas. + if (isReadOnlyReplica()) { return DirectoryReader.open(store.directory()); } return DirectoryReader.open(indexWriter); @@ -1980,7 +1984,7 @@ public boolean shouldPeriodicallyFlush() { @Override public void flush(boolean force, boolean waitIfOngoing) throws EngineException { - if (engineConfig.isPrimary() == false) { + if (isReadOnlyReplica()) { return; } ensureOpen(); @@ -2273,7 +2277,7 @@ public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws En @Override public SegmentInfosRef getLatestSegmentInfosSafe() { - assert (engineConfig.isPrimary()); + assert (engineConfig.isReadOnly() == false); final SegmentInfos segmentInfos = getLatestSegmentInfos(); try { indexWriter.incRefDeleter(segmentInfos); @@ -2440,7 +2444,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) { // no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed logger.trace("rollback indexWriter"); try { - if (engineConfig.isPrimary()) { + if (isReadOnlyReplica() == false) { indexWriter.rollback(); } } catch (AlreadyClosedException ex) { diff --git a/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java b/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java index da2365a94aa8a..a0f9f9c5d7a5d 100644 --- a/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java +++ b/server/src/main/java/org/opensearch/index/engine/OpenSearchReaderManager.java @@ -32,10 +32,6 @@ package org.opensearch.index.engine; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.index.DirectoryReader; @@ -44,11 +40,14 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.StandardDirectoryReader; import org.apache.lucene.search.ReferenceManager; - import org.apache.lucene.search.SearcherManager; import org.opensearch.common.SuppressForbidden; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + /** * Utility class to safely share {@link OpenSearchDirectoryReader} instances across * multiple threads, while periodically reopening. This class ensures each diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index d4753e7763e6e..be13aa8273dae 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -845,8 +845,7 @@ public Engine.IndexResult applyIndexOperationOnReplica( boolean isRetry, SourceToParse sourceToParse ) throws IOException { - Boolean isSegRepEnabled = indexSettings.isSegrepEnabled(); - if (isSegRepEnabled != null && isSegRepEnabled) { + if (indexSettings.isSegrepEnabled()) { Engine.Index index; try { index = parseSourceAndPrepareIndex( @@ -1278,7 +1277,7 @@ public void refresh(String source) { */ public long getWritingBytes() { // TODO: Segrep: hack - if not the primary our IW is null and this blows up. - if (shardRouting.primary() == false) { + if (indexSettings.isSegrepEnabled() && (shardRouting.primary() == false)) { return 0L; } Engine engine = getEngineOrNull(); @@ -2030,7 +2029,9 @@ public void openEngineAndRecoverFromTranslog() throws IOException { public void openEngineAndSkipTranslogRecovery() throws IOException { assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]"; // TODO: Segrep - fix initial recovery stages from ReplicationTarget. - // recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG); + if (indexSettings.isSegrepEnabled() == false) { + recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG); + } loadGlobalCheckpointToReplicationTracker(); innerOpenEngineAndTranslog(replicationTracker); getEngine().skipTranslogRecovery(); @@ -2067,7 +2068,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine. onSettingsChanged(); // TODO: Segrep - Fix - // assert assertSequenceNumbersInCommit(); + assert assertSequenceNumbersInCommit(); recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG); } @@ -2239,7 +2240,7 @@ protected final void verifyActive() throws IllegalIndexShardStateException { * Returns number of heap bytes used by the indexing buffer for this shard, or 0 if the shard is closed */ public long getIndexBufferRAMBytesUsed() { - if (shardRouting.primary() == false) { + if (indexSettings.isSegrepEnabled() && (shardRouting.primary() == false)) { return 0; } Engine engine = getEngineOrNull(); @@ -2715,7 +2716,7 @@ public void syncRetentionLeases() { assert assertPrimaryMode(); verifyNotClosed(); // TODO: Segrep - Fix retention leases - // replicationTracker.renewPeerRecoveryRetentionLeases(); + replicationTracker.renewPeerRecoveryRetentionLeases(); final Tuple retentionLeases = getRetentionLeases(true); if (retentionLeases.v1()) { logger.trace("syncing retention leases [{}] after expiration check", retentionLeases.v2()); @@ -3073,32 +3074,35 @@ public void startRecovery( case PEER: try { markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState); - IndexShard indexShard = this; - segmentReplicationReplicaService.prepareForReplication( - this, - recoveryState.getTargetNode(), - recoveryState.getSourceNode(), - new ActionListener() { - @Override - public void onResponse(TrackShardResponse unused) { - replicationListener.onReplicationDone(replicationState); - recoveryState.getIndex().setFileDetailsComplete(); - finalizeRecovery(); - postRecovery("Shard setup complete."); - } + if (indexSettings.isSegrepEnabled()) { + IndexShard indexShard = this; + segmentReplicationReplicaService.prepareForReplication( + this, + recoveryState.getTargetNode(), + recoveryState.getSourceNode(), + new ActionListener() { + @Override + public void onResponse(TrackShardResponse unused) { + replicationListener.onReplicationDone(replicationState); + recoveryState.getIndex().setFileDetailsComplete(); + finalizeRecovery(); + postRecovery("Shard setup complete."); + } - @Override - public void onFailure(Exception e) { - replicationListener.onReplicationFailure( - replicationState, - new ReplicationFailedException(indexShard, e), - true - ); + @Override + public void onFailure(Exception e) { + replicationListener.onReplicationFailure( + replicationState, + new ReplicationFailedException(indexShard, e), + true + ); + } } - } - ); + ); + } else { + peerRecoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener); + } } catch (Exception e) { - logger.error("Error preparing the shard for Segment replication", e); failShard("corrupted preexisting index", e); recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true); } @@ -3295,6 +3299,12 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { this.warmer.warm(reader); } }; + final List internalRefreshListener; + if (indexSettings.isSegrepEnabled()) { + internalRefreshListener = Arrays.asList(new RefreshMetricUpdater(refreshMetric), checkpointRefreshListener); + } else { + internalRefreshListener = Collections.singletonList(new RefreshMetricUpdater(refreshMetric)); + } return this.engineConfigFactory.newEngineConfig( shardId, threadPool, @@ -3311,7 +3321,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { translogConfig, IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), Arrays.asList(refreshListeners, refreshPendingLocationListener), - Arrays.asList(new RefreshMetricUpdater(refreshMetric), checkpointRefreshListener), + internalRefreshListener, indexSort, circuitBreakerService, globalCheckpointSupplier, @@ -3910,7 +3920,7 @@ ReplicationTracker getReplicationTracker() { public boolean scheduledRefresh() { // skip if not primary shard. // TODO: Segrep - should split into primary/replica classes. - if (shardRouting.primary() == false) { + if ((indexSettings.isSegrepEnabled()) && (shardRouting.primary() == false)) { return false; } verifyNotClosed(); diff --git a/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java b/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java index fe35711547fce..e88d123f50679 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java +++ b/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java @@ -48,6 +48,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.Iterator; import java.util.Map; @@ -151,7 +152,9 @@ private void innerWriteFileChunk(StoreFileMetadata fileMetadata, long position, + Arrays.toString(store.directory().listAll()); // TODO: Segrep - toggle this with a setting. With segrep we don't want this fsync we will only fsync // when a new checkpoint is received. - // store.directory().sync(Collections.singleton(temporaryFileName)); + if (store.indexSettings().isSegrepEnabled() == false) { + store.directory().sync(Collections.singleton(temporaryFileName)); + } IndexOutput remove = removeOpenIndexOutputs(name); assert remove == null || remove == indexOutput; // remove maybe null if we got finished } diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index cb8cc7ef6ab03..0725445a39865 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -3440,7 +3440,7 @@ public void testRecoverFromForeignTranslog() throws IOException { () -> RetentionLeases.EMPTY, primaryTerm::get, tombstoneDocSupplier(), - config.isPrimary() + config.isReadOnly() ); expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig)); @@ -3483,7 +3483,7 @@ public CustomTranslogDeletionPolicy(IndexSettings indexSettings, Supplier