Skip to content

Commit

Permalink
refactor isPrimary logic to isReadOnly
Browse files Browse the repository at this point in the history
Signed-off-by: Poojita Raj <[email protected]>
  • Loading branch information
Poojita-Raj committed Mar 7, 2022
1 parent 56ef936 commit e8437bd
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(),
config.getTombstoneDocSupplier(),
config.isPrimary()
config.isReadOnly()
);
}

Expand Down
4 changes: 0 additions & 4 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -653,10 +653,6 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
this.indexMetadata = indexMetadata;
numberOfShards = settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, null);
isSegrepEnabled = settings.getAsBoolean(IndexMetadata.SETTING_SEGMENT_REPLICATION, false);
<<<<<<< HEAD
=======

>>>>>>> 5950974c5a2 (toggle)
this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings);
this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings);
this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public final class EngineConfig {
private final CircuitBreakerService circuitBreakerService;
private final LongSupplier globalCheckpointSupplier;
private final Supplier<RetentionLeases> retentionLeasesSupplier;
private boolean isPrimary;
private boolean isReadOnly;

/**
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
Expand Down Expand Up @@ -171,7 +171,7 @@ public EngineConfig(
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier,
boolean isPrimary
boolean isReadOnly
) {
this(
shardId,
Expand All @@ -195,7 +195,7 @@ public EngineConfig(
circuitBreakerService,
globalCheckpointSupplier,
retentionLeasesSupplier,
isPrimary,
isReadOnly,
primaryTermSupplier,
tombstoneDocSupplier
);
Expand Down Expand Up @@ -226,7 +226,7 @@ public EngineConfig(
CircuitBreakerService circuitBreakerService,
LongSupplier globalCheckpointSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
boolean isPrimary,
boolean isReadOnly,
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier
) {
Expand All @@ -241,7 +241,7 @@ public EngineConfig(
this.codecService = codecService;
this.eventListener = eventListener;
codecName = indexSettings.getValue(INDEX_CODEC_SETTING);
this.isPrimary = isPrimary;
this.isReadOnly = isReadOnly;
// We need to make the indexing buffer for this shard at least as large
// as the amount of memory that is available for all engines on the
// local node so that decisions to flush segments to disk are made by
Expand Down Expand Up @@ -463,8 +463,8 @@ public LongSupplier getPrimaryTermSupplier() {
return primaryTermSupplier;
}

public boolean isPrimary() {
return isPrimary;
public boolean isReadOnly() {
return isReadOnly;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ public class InternalEngine extends Engine {
@Nullable
private volatile String forceMergeUUID;

private boolean isReadOnlyReplica() {
return engineConfig.getIndexSettings().isSegrepEnabled() && engineConfig.isReadOnly();
}

public InternalEngine(EngineConfig engineConfig) {
this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new);
}
Expand Down Expand Up @@ -277,7 +281,7 @@ public InternalEngine(EngineConfig engineConfig) {
);
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
// TODO: Segrep - should have a separate read only engine rather than all this conditional logic.
if ((engineConfig.getIndexSettings().isSegrepEnabled()) && engineConfig.isPrimary() == false) {
if (isReadOnlyReplica()) {
// Segrep - hack to make this engine read only and not use
writer = null;
historyUUID = null;
Expand Down Expand Up @@ -346,7 +350,7 @@ public InternalEngine(EngineConfig engineConfig) {

@Override
public void updateCurrentInfos(byte[] infosBytes, long gen, long seqNo) throws IOException {
assert engineConfig.isPrimary() == false : "Only replicas should update Infos";
assert engineConfig.isReadOnly() == true : "Only replicas should update Infos";
SegmentInfos infos = SegmentInfos.readCommit(this.store.directory(), toIndexInput(infosBytes), gen);
assert gen == infos.getGeneration();
externalReaderManager.internalReaderManager.updateSegments(infos);
Expand Down Expand Up @@ -582,8 +586,7 @@ private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecovery
translog.currentFileGeneration()
)
);
boolean segrep = engineConfig.getIndexSettings().isSegrepEnabled();
if ((segrep == false) || (segrep && engineConfig.isPrimary())) {
if (isReadOnlyReplica() == false) {
flush(false, true);
}
translog.trimUnreferencedReaders();
Expand Down Expand Up @@ -653,8 +656,7 @@ public Translog.Location getTranslogLastWriteLocation() {

private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException {
if (combinedDeletionPolicy.hasUnreferencedCommits()) {
boolean segrep = engineConfig.getIndexSettings().isSegrepEnabled();
if ((segrep == false) || (segrep && engineConfig.isPrimary())) {
if (isReadOnlyReplica() == false) {
indexWriter.deleteUnusedFiles();
}
}
Expand Down Expand Up @@ -718,7 +720,7 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external

private DirectoryReader getDirectoryReader() throws IOException {
// for segment replication: replicas should create the reader from store, we don't want an open IW on replicas.
if (engineConfig.getIndexSettings().isSegrepEnabled() && engineConfig.isPrimary() == false) {
if (isReadOnlyReplica()) {
return DirectoryReader.open(store.directory());
}
return DirectoryReader.open(indexWriter);
Expand Down Expand Up @@ -1982,7 +1984,7 @@ public boolean shouldPeriodicallyFlush() {

@Override
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
if ((engineConfig.getIndexSettings().isSegrepEnabled()) && (engineConfig.isPrimary() == false)) {
if (isReadOnlyReplica()) {
return;
}
ensureOpen();
Expand Down Expand Up @@ -2275,7 +2277,7 @@ public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws En

@Override
public SegmentInfosRef getLatestSegmentInfosSafe() {
assert (engineConfig.isPrimary());
assert (engineConfig.isReadOnly() == false);
final SegmentInfos segmentInfos = getLatestSegmentInfos();
try {
indexWriter.incRefDeleter(segmentInfos);
Expand Down Expand Up @@ -2442,8 +2444,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
// no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed
logger.trace("rollback indexWriter");
try {
boolean segrep = engineConfig.getIndexSettings().isSegrepEnabled();
if (segrep == false || (segrep && engineConfig.isPrimary())) {
if (isReadOnlyReplica() == false) {
indexWriter.rollback();
}
} catch (AlreadyClosedException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2068,9 +2068,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
onSettingsChanged();
// TODO: Segrep - Fix
if (indexSettings.isSegrepEnabled() == false) {
assert assertSequenceNumbersInCommit();
}
assert assertSequenceNumbersInCommit();
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
}

Expand Down Expand Up @@ -2718,9 +2716,7 @@ public void syncRetentionLeases() {
assert assertPrimaryMode();
verifyNotClosed();
// TODO: Segrep - Fix retention leases
if (indexSettings.isSegrepEnabled() == false) {
replicationTracker.renewPeerRecoveryRetentionLeases();
}
replicationTracker.renewPeerRecoveryRetentionLeases();
final Tuple<Boolean, RetentionLeases> retentionLeases = getRetentionLeases(true);
if (retentionLeases.v1()) {
logger.trace("syncing retention leases [{}] after expiration check", retentionLeases.v2());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3440,7 +3440,7 @@ public void testRecoverFromForeignTranslog() throws IOException {
() -> RetentionLeases.EMPTY,
primaryTerm::get,
tombstoneDocSupplier(),
config.isPrimary()
config.isReadOnly()
);
expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig));

Expand Down Expand Up @@ -3483,7 +3483,7 @@ public CustomTranslogDeletionPolicy(IndexSettings indexSettings, Supplier<Retent
config.getCircuitBreakerService(),
config.getGlobalCheckpointSupplier(),
config.retentionLeasesSupplier(),
config.isPrimary(),
config.isReadOnly(),
config.getPrimaryTermSupplier(),
config.getTombstoneDocSupplier()
);
Expand Down Expand Up @@ -7105,7 +7105,7 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception {
config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(),
config.getTombstoneDocSupplier(),
config.isPrimary()
config.isReadOnly()
);
try (InternalEngine engine = createEngine(configWithWarmer)) {
assertThat(warmedUpReaders, empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4457,7 +4457,7 @@ public void testCloseShardWhileEngineIsWarming() throws Exception {
config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(),
config.getTombstoneDocSupplier(),
config.isPrimary()
config.isReadOnly()
);
return new InternalEngine(configWithWarmer);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ EngineConfig configWithRefreshListener(EngineConfig config, ReferenceManager.Ref
config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(),
config.getTombstoneDocSupplier(),
config.isPrimary()
config.isReadOnly()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ public EngineConfig copy(EngineConfig config, LongSupplier globalCheckpointSuppl
config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(),
tombstoneDocSupplier(),
config.isPrimary()
config.isReadOnly()
);
}

Expand Down Expand Up @@ -296,7 +296,7 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) {
config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(),
config.getTombstoneDocSupplier(),
config.isPrimary()
config.isReadOnly()
);
}

Expand Down Expand Up @@ -324,7 +324,7 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) {
config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(),
config.getTombstoneDocSupplier(),
config.isPrimary()
config.isReadOnly()
);
}

Expand Down Expand Up @@ -940,7 +940,7 @@ protected EngineConfig config(
config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(),
tombstoneDocSupplier,
config.isPrimary()
config.isReadOnly()
);
}

Expand Down

0 comments on commit e8437bd

Please sign in to comment.