Skip to content

Commit

Permalink
Toggle replication strategy based on segrep index setting (#2318)
Browse files Browse the repository at this point in the history
* toggle

Signed-off-by: Poojita Raj <[email protected]>

index writer null exception

Signed-off-by: Poojita Raj <[email protected]>

included conditional logic

Signed-off-by: Poojita Raj <[email protected]>

internal listener fix

Signed-off-by: Poojita Raj <[email protected]>

* docrep working

Signed-off-by: Poojita Raj <[email protected]>

* refactor isPrimary logic to isReadOnly

Signed-off-by: Poojita Raj <[email protected]>

* Update assert exception message
  • Loading branch information
Poojita-Raj authored Mar 8, 2022
1 parent 4d25000 commit 9341099
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(),
config.getTombstoneDocSupplier(),
config.isPrimary()
config.isReadOnly()
);
}

Expand Down
4 changes: 1 addition & 3 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -915,9 +915,7 @@ private void maybeRefreshEngine(boolean force) {
if (indexSettings.getRefreshInterval().millis() > 0 || force) {
for (IndexShard shard : this.shards.values()) {
try {
if (shard.routingEntry().primary()) {
shard.scheduledRefresh();
}
shard.scheduledRefresh();
} catch (IndexShardClosedException | AlreadyClosedException ex) {
// fine - continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ public int getNumberOfReplicas() {
}

public boolean isSegrepEnabled() {
return isSegrepEnabled;
return Boolean.TRUE.equals(isSegrepEnabled);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public final class EngineConfig {
private final CircuitBreakerService circuitBreakerService;
private final LongSupplier globalCheckpointSupplier;
private final Supplier<RetentionLeases> retentionLeasesSupplier;
private boolean isPrimary;
private boolean isReadOnly;

/**
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
Expand Down Expand Up @@ -171,7 +171,7 @@ public EngineConfig(
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier,
boolean isPrimary
boolean isReadOnly
) {
this(
shardId,
Expand All @@ -195,7 +195,7 @@ public EngineConfig(
circuitBreakerService,
globalCheckpointSupplier,
retentionLeasesSupplier,
isPrimary,
isReadOnly,
primaryTermSupplier,
tombstoneDocSupplier
);
Expand Down Expand Up @@ -226,7 +226,7 @@ public EngineConfig(
CircuitBreakerService circuitBreakerService,
LongSupplier globalCheckpointSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
boolean isPrimary,
boolean isReadOnly,
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier
) {
Expand All @@ -241,7 +241,7 @@ public EngineConfig(
this.codecService = codecService;
this.eventListener = eventListener;
codecName = indexSettings.getValue(INDEX_CODEC_SETTING);
this.isPrimary = isPrimary;
this.isReadOnly = isReadOnly;
// We need to make the indexing buffer for this shard at least as large
// as the amount of memory that is available for all engines on the
// local node so that decisions to flush segments to disk are made by
Expand Down Expand Up @@ -463,8 +463,8 @@ public LongSupplier getPrimaryTermSupplier() {
return primaryTermSupplier;
}

public boolean isPrimary() {
return isPrimary;
public boolean isReadOnly() {
return isReadOnly;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ public class InternalEngine extends Engine {
@Nullable
private volatile String forceMergeUUID;

private boolean isReadOnlyReplica() {
return engineConfig.getIndexSettings().isSegrepEnabled() && engineConfig.isReadOnly();
}

public InternalEngine(EngineConfig engineConfig) {
this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new);
}
Expand Down Expand Up @@ -277,17 +281,17 @@ public InternalEngine(EngineConfig engineConfig) {
);
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
// TODO: Segrep - should have a separate read only engine rather than all this conditional logic.
if (engineConfig.isPrimary()) {
if (isReadOnlyReplica()) {
// Segrep - hack to make this engine read only and not use
writer = null;
historyUUID = null;
forceMergeUUID = null;
} else {
writer = createWriter();
bootstrapAppendOnlyInfoFromWriter(writer);
final Map<String, String> commitData = commitDataAsMap(writer);
historyUUID = loadHistoryUUID(commitData);
forceMergeUUID = commitData.get(FORCE_MERGE_UUID_KEY);
} else {
// Segrep - hack to make this engine read only and not use
writer = null;
historyUUID = null;
forceMergeUUID = null;
}
indexWriter = writer;
} catch (IOException | TranslogCorruptedException e) {
Expand Down Expand Up @@ -346,7 +350,7 @@ public InternalEngine(EngineConfig engineConfig) {

@Override
public void updateCurrentInfos(byte[] infosBytes, long gen, long seqNo) throws IOException {
assert engineConfig.isPrimary() == false : "Only replicas should update Infos";
assert engineConfig.isReadOnly() == true : "Only read-only replicas should update Infos";
SegmentInfos infos = SegmentInfos.readCommit(this.store.directory(), toIndexInput(infosBytes), gen);
assert gen == infos.getGeneration();
externalReaderManager.internalReaderManager.updateSegments(infos);
Expand Down Expand Up @@ -582,7 +586,7 @@ private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecovery
translog.currentFileGeneration()
)
);
if (engineConfig.isPrimary()) {
if (isReadOnlyReplica() == false) {
flush(false, true);
}
translog.trimUnreferencedReaders();
Expand Down Expand Up @@ -652,7 +656,7 @@ public Translog.Location getTranslogLastWriteLocation() {

private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException {
if (combinedDeletionPolicy.hasUnreferencedCommits()) {
if (engineConfig.isPrimary()) {
if (isReadOnlyReplica() == false) {
indexWriter.deleteUnusedFiles();
}
}
Expand Down Expand Up @@ -715,8 +719,8 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external
}

private DirectoryReader getDirectoryReader() throws IOException {
// replicas should create the reader from store, we don't want an open IW on replicas.
if (engineConfig.isPrimary() == false) {
// for segment replication: replicas should create the reader from store, we don't want an open IW on replicas.
if (isReadOnlyReplica()) {
return DirectoryReader.open(store.directory());
}
return DirectoryReader.open(indexWriter);
Expand Down Expand Up @@ -1980,7 +1984,7 @@ public boolean shouldPeriodicallyFlush() {

@Override
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
if (engineConfig.isPrimary() == false) {
if (isReadOnlyReplica()) {
return;
}
ensureOpen();
Expand Down Expand Up @@ -2273,7 +2277,7 @@ public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws En

@Override
public SegmentInfosRef getLatestSegmentInfosSafe() {
assert (engineConfig.isPrimary());
assert (engineConfig.isReadOnly() == false);
final SegmentInfos segmentInfos = getLatestSegmentInfos();
try {
indexWriter.incRefDeleter(segmentInfos);
Expand Down Expand Up @@ -2440,7 +2444,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
// no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed
logger.trace("rollback indexWriter");
try {
if (engineConfig.isPrimary()) {
if (isReadOnlyReplica() == false) {
indexWriter.rollback();
}
} catch (AlreadyClosedException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@

package org.opensearch.index.engine;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.DirectoryReader;
Expand All @@ -44,11 +40,14 @@
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.search.ReferenceManager;

import org.apache.lucene.search.SearcherManager;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* Utility class to safely share {@link OpenSearchDirectoryReader} instances across
* multiple threads, while periodically reopening. This class ensures each
Expand Down
74 changes: 42 additions & 32 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -845,8 +845,7 @@ public Engine.IndexResult applyIndexOperationOnReplica(
boolean isRetry,
SourceToParse sourceToParse
) throws IOException {
Boolean isSegRepEnabled = indexSettings.isSegrepEnabled();
if (isSegRepEnabled != null && isSegRepEnabled) {
if (indexSettings.isSegrepEnabled()) {
Engine.Index index;
try {
index = parseSourceAndPrepareIndex(
Expand Down Expand Up @@ -1278,7 +1277,7 @@ public void refresh(String source) {
*/
public long getWritingBytes() {
// TODO: Segrep: hack - if not the primary our IW is null and this blows up.
if (shardRouting.primary() == false) {
if (indexSettings.isSegrepEnabled() && (shardRouting.primary() == false)) {
return 0L;
}
Engine engine = getEngineOrNull();
Expand Down Expand Up @@ -2030,7 +2029,9 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
public void openEngineAndSkipTranslogRecovery() throws IOException {
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
// TODO: Segrep - fix initial recovery stages from ReplicationTarget.
// recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
if (indexSettings.isSegrepEnabled() == false) {
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
}
loadGlobalCheckpointToReplicationTracker();
innerOpenEngineAndTranslog(replicationTracker);
getEngine().skipTranslogRecovery();
Expand Down Expand Up @@ -2067,7 +2068,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
onSettingsChanged();
// TODO: Segrep - Fix
// assert assertSequenceNumbersInCommit();
assert assertSequenceNumbersInCommit();
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
}

Expand Down Expand Up @@ -2239,7 +2240,7 @@ protected final void verifyActive() throws IllegalIndexShardStateException {
* Returns number of heap bytes used by the indexing buffer for this shard, or 0 if the shard is closed
*/
public long getIndexBufferRAMBytesUsed() {
if (shardRouting.primary() == false) {
if (indexSettings.isSegrepEnabled() && (shardRouting.primary() == false)) {
return 0;
}
Engine engine = getEngineOrNull();
Expand Down Expand Up @@ -2715,7 +2716,7 @@ public void syncRetentionLeases() {
assert assertPrimaryMode();
verifyNotClosed();
// TODO: Segrep - Fix retention leases
// replicationTracker.renewPeerRecoveryRetentionLeases();
replicationTracker.renewPeerRecoveryRetentionLeases();
final Tuple<Boolean, RetentionLeases> retentionLeases = getRetentionLeases(true);
if (retentionLeases.v1()) {
logger.trace("syncing retention leases [{}] after expiration check", retentionLeases.v2());
Expand Down Expand Up @@ -3073,32 +3074,35 @@ public void startRecovery(
case PEER:
try {
markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState);
IndexShard indexShard = this;
segmentReplicationReplicaService.prepareForReplication(
this,
recoveryState.getTargetNode(),
recoveryState.getSourceNode(),
new ActionListener<TrackShardResponse>() {
@Override
public void onResponse(TrackShardResponse unused) {
replicationListener.onReplicationDone(replicationState);
recoveryState.getIndex().setFileDetailsComplete();
finalizeRecovery();
postRecovery("Shard setup complete.");
}
if (indexSettings.isSegrepEnabled()) {
IndexShard indexShard = this;
segmentReplicationReplicaService.prepareForReplication(
this,
recoveryState.getTargetNode(),
recoveryState.getSourceNode(),
new ActionListener<TrackShardResponse>() {
@Override
public void onResponse(TrackShardResponse unused) {
replicationListener.onReplicationDone(replicationState);
recoveryState.getIndex().setFileDetailsComplete();
finalizeRecovery();
postRecovery("Shard setup complete.");
}

@Override
public void onFailure(Exception e) {
replicationListener.onReplicationFailure(
replicationState,
new ReplicationFailedException(indexShard, e),
true
);
@Override
public void onFailure(Exception e) {
replicationListener.onReplicationFailure(
replicationState,
new ReplicationFailedException(indexShard, e),
true
);
}
}
}
);
);
} else {
peerRecoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener);
}
} catch (Exception e) {
logger.error("Error preparing the shard for Segment replication", e);
failShard("corrupted preexisting index", e);
recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true);
}
Expand Down Expand Up @@ -3295,6 +3299,12 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
this.warmer.warm(reader);
}
};
final List<ReferenceManager.RefreshListener> internalRefreshListener;
if (indexSettings.isSegrepEnabled()) {
internalRefreshListener = Arrays.asList(new RefreshMetricUpdater(refreshMetric), checkpointRefreshListener);
} else {
internalRefreshListener = Collections.singletonList(new RefreshMetricUpdater(refreshMetric));
}
return this.engineConfigFactory.newEngineConfig(
shardId,
threadPool,
Expand All @@ -3311,7 +3321,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Arrays.asList(refreshListeners, refreshPendingLocationListener),
Arrays.asList(new RefreshMetricUpdater(refreshMetric), checkpointRefreshListener),
internalRefreshListener,
indexSort,
circuitBreakerService,
globalCheckpointSupplier,
Expand Down Expand Up @@ -3910,7 +3920,7 @@ ReplicationTracker getReplicationTracker() {
public boolean scheduledRefresh() {
// skip if not primary shard.
// TODO: Segrep - should split into primary/replica classes.
if (shardRouting.primary() == false) {
if ((indexSettings.isSegrepEnabled()) && (shardRouting.primary() == false)) {
return false;
}
verifyNotClosed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -151,7 +152,9 @@ private void innerWriteFileChunk(StoreFileMetadata fileMetadata, long position,
+ Arrays.toString(store.directory().listAll());
// TODO: Segrep - toggle this with a setting. With segrep we don't want this fsync we will only fsync
// when a new checkpoint is received.
// store.directory().sync(Collections.singleton(temporaryFileName));
if (store.indexSettings().isSegrepEnabled() == false) {
store.directory().sync(Collections.singleton(temporaryFileName));
}
IndexOutput remove = removeOpenIndexOutputs(name);
assert remove == null || remove == indexOutput; // remove maybe null if we got finished
}
Expand Down
Loading

0 comments on commit 9341099

Please sign in to comment.