Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Toggle replication strategy based on segrep index setting #2318

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(),
config.getTombstoneDocSupplier(),
config.isPrimary()
config.isReadOnly()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -915,9 +915,7 @@ private void maybeRefreshEngine(boolean force) {
if (indexSettings.getRefreshInterval().millis() > 0 || force) {
for (IndexShard shard : this.shards.values()) {
try {
if (shard.routingEntry().primary()) {
shard.scheduledRefresh();
}
shard.scheduledRefresh();
} catch (IndexShardClosedException | AlreadyClosedException ex) {
// fine - continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ public int getNumberOfReplicas() {
}

public boolean isSegrepEnabled() {
return isSegrepEnabled;
return Boolean.TRUE.equals(isSegrepEnabled);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - this isn't required bc we always initialize this variable in the constructor.
isSegrepEnabled = settings.getAsBoolean(IndexMetadata.SETTING_SEGMENT_REPLICATION, false);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True but leaving it in to convert to boolean which is faster since we don't need a thread safe implementation.

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public final class EngineConfig {
private final CircuitBreakerService circuitBreakerService;
private final LongSupplier globalCheckpointSupplier;
private final Supplier<RetentionLeases> retentionLeasesSupplier;
private boolean isPrimary;
private boolean isReadOnly;

/**
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
Expand Down Expand Up @@ -171,7 +171,7 @@ public EngineConfig(
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier,
boolean isPrimary
boolean isReadOnly
) {
this(
shardId,
Expand All @@ -195,7 +195,7 @@ public EngineConfig(
circuitBreakerService,
globalCheckpointSupplier,
retentionLeasesSupplier,
isPrimary,
isReadOnly,
primaryTermSupplier,
tombstoneDocSupplier
);
Expand Down Expand Up @@ -226,7 +226,7 @@ public EngineConfig(
CircuitBreakerService circuitBreakerService,
LongSupplier globalCheckpointSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
boolean isPrimary,
boolean isReadOnly,
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier
) {
Expand All @@ -241,7 +241,7 @@ public EngineConfig(
this.codecService = codecService;
this.eventListener = eventListener;
codecName = indexSettings.getValue(INDEX_CODEC_SETTING);
this.isPrimary = isPrimary;
this.isReadOnly = isReadOnly;
// We need to make the indexing buffer for this shard at least as large
// as the amount of memory that is available for all engines on the
// local node so that decisions to flush segments to disk are made by
Expand Down Expand Up @@ -463,8 +463,8 @@ public LongSupplier getPrimaryTermSupplier() {
return primaryTermSupplier;
}

public boolean isPrimary() {
return isPrimary;
public boolean isReadOnly() {
return isReadOnly;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ public class InternalEngine extends Engine {
@Nullable
private volatile String forceMergeUUID;

private boolean isReadOnlyReplica() {
return engineConfig.getIndexSettings().isSegrepEnabled() && engineConfig.isReadOnly();
}

public InternalEngine(EngineConfig engineConfig) {
this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new);
}
Expand Down Expand Up @@ -277,17 +281,17 @@ public InternalEngine(EngineConfig engineConfig) {
);
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
// TODO: Segrep - should have a separate read only engine rather than all this conditional logic.
if (engineConfig.isPrimary()) {
if (isReadOnlyReplica()) {
// Segrep - hack to make this engine read only and not use
writer = null;
historyUUID = null;
forceMergeUUID = null;
} else {
writer = createWriter();
bootstrapAppendOnlyInfoFromWriter(writer);
final Map<String, String> commitData = commitDataAsMap(writer);
historyUUID = loadHistoryUUID(commitData);
forceMergeUUID = commitData.get(FORCE_MERGE_UUID_KEY);
} else {
// Segrep - hack to make this engine read only and not use
writer = null;
historyUUID = null;
forceMergeUUID = null;
}
indexWriter = writer;
} catch (IOException | TranslogCorruptedException e) {
Expand Down Expand Up @@ -346,7 +350,7 @@ public InternalEngine(EngineConfig engineConfig) {

@Override
public void updateCurrentInfos(byte[] infosBytes, long gen, long seqNo) throws IOException {
assert engineConfig.isPrimary() == false : "Only replicas should update Infos";
assert engineConfig.isReadOnly() == true : "Only read-only replicas should update Infos";
SegmentInfos infos = SegmentInfos.readCommit(this.store.directory(), toIndexInput(infosBytes), gen);
assert gen == infos.getGeneration();
externalReaderManager.internalReaderManager.updateSegments(infos);
Expand Down Expand Up @@ -582,7 +586,7 @@ private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecovery
translog.currentFileGeneration()
)
);
if (engineConfig.isPrimary()) {
if (isReadOnlyReplica() == false) {
flush(false, true);
}
translog.trimUnreferencedReaders();
Expand Down Expand Up @@ -652,7 +656,7 @@ public Translog.Location getTranslogLastWriteLocation() {

private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException {
if (combinedDeletionPolicy.hasUnreferencedCommits()) {
if (engineConfig.isPrimary()) {
if (isReadOnlyReplica() == false) {
indexWriter.deleteUnusedFiles();
}
}
Expand Down Expand Up @@ -715,8 +719,8 @@ private ExternalReaderManager createReaderManager(RefreshWarmerListener external
}

private DirectoryReader getDirectoryReader() throws IOException {
// replicas should create the reader from store, we don't want an open IW on replicas.
if (engineConfig.isPrimary() == false) {
// for segment replication: replicas should create the reader from store, we don't want an open IW on replicas.
if (isReadOnlyReplica()) {
return DirectoryReader.open(store.directory());
}
return DirectoryReader.open(indexWriter);
Expand Down Expand Up @@ -1980,7 +1984,7 @@ public boolean shouldPeriodicallyFlush() {

@Override
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
if (engineConfig.isPrimary() == false) {
if (isReadOnlyReplica()) {
return;
}
ensureOpen();
Expand Down Expand Up @@ -2273,7 +2277,7 @@ public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws En

@Override
public SegmentInfosRef getLatestSegmentInfosSafe() {
assert (engineConfig.isPrimary());
assert (engineConfig.isReadOnly() == false);
final SegmentInfos segmentInfos = getLatestSegmentInfos();
try {
indexWriter.incRefDeleter(segmentInfos);
Expand Down Expand Up @@ -2440,7 +2444,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
// no need to commit in this case!, we snapshot before we close the shard, so translog and all sync'ed
logger.trace("rollback indexWriter");
try {
if (engineConfig.isPrimary()) {
if (isReadOnlyReplica() == false) {
indexWriter.rollback();
}
} catch (AlreadyClosedException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@

package org.opensearch.index.engine;

import java.io.IOException;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this reorder intentional? Think this may be IDE.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left it in since this seems to be the standard order followed by all other files

import java.util.ArrayList;
import java.util.List;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.DirectoryReader;
Expand All @@ -44,11 +40,14 @@
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.search.ReferenceManager;

import org.apache.lucene.search.SearcherManager;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* Utility class to safely share {@link OpenSearchDirectoryReader} instances across
* multiple threads, while periodically reopening. This class ensures each
Expand Down
74 changes: 42 additions & 32 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -845,8 +845,7 @@ public Engine.IndexResult applyIndexOperationOnReplica(
boolean isRetry,
SourceToParse sourceToParse
) throws IOException {
Boolean isSegRepEnabled = indexSettings.isSegrepEnabled();
if (isSegRepEnabled != null && isSegRepEnabled) {
if (indexSettings.isSegrepEnabled()) {
Engine.Index index;
try {
index = parseSourceAndPrepareIndex(
Expand Down Expand Up @@ -1278,7 +1277,7 @@ public void refresh(String source) {
*/
public long getWritingBytes() {
// TODO: Segrep: hack - if not the primary our IW is null and this blows up.
if (shardRouting.primary() == false) {
if (indexSettings.isSegrepEnabled() && (shardRouting.primary() == false)) {
return 0L;
}
Engine engine = getEngineOrNull();
Expand Down Expand Up @@ -2030,7 +2029,9 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
public void openEngineAndSkipTranslogRecovery() throws IOException {
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
// TODO: Segrep - fix initial recovery stages from ReplicationTarget.
// recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
if (indexSettings.isSegrepEnabled() == false) {
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
}
loadGlobalCheckpointToReplicationTracker();
innerOpenEngineAndTranslog(replicationTracker);
getEngine().skipTranslogRecovery();
Expand Down Expand Up @@ -2067,7 +2068,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
onSettingsChanged();
// TODO: Segrep - Fix
// assert assertSequenceNumbersInCommit();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method call may be ok to uncomment now with segrep enabled. We can take that as a separate change.

assert assertSequenceNumbersInCommit();
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
}

Expand Down Expand Up @@ -2239,7 +2240,7 @@ protected final void verifyActive() throws IllegalIndexShardStateException {
* Returns number of heap bytes used by the indexing buffer for this shard, or 0 if the shard is closed
*/
public long getIndexBufferRAMBytesUsed() {
if (shardRouting.primary() == false) {
if (indexSettings.isSegrepEnabled() && (shardRouting.primary() == false)) {
return 0;
}
Engine engine = getEngineOrNull();
Expand Down Expand Up @@ -2715,7 +2716,7 @@ public void syncRetentionLeases() {
assert assertPrimaryMode();
verifyNotClosed();
// TODO: Segrep - Fix retention leases
// replicationTracker.renewPeerRecoveryRetentionLeases();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving a note here similar to the above - I think this may be ok to uncomment now with segrep enabled. This was commented before we were correctly wiring up retention leases for replicas.

replicationTracker.renewPeerRecoveryRetentionLeases();
final Tuple<Boolean, RetentionLeases> retentionLeases = getRetentionLeases(true);
if (retentionLeases.v1()) {
logger.trace("syncing retention leases [{}] after expiration check", retentionLeases.v2());
Expand Down Expand Up @@ -3073,32 +3074,35 @@ public void startRecovery(
case PEER:
try {
markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState);
IndexShard indexShard = this;
segmentReplicationReplicaService.prepareForReplication(
this,
recoveryState.getTargetNode(),
recoveryState.getSourceNode(),
new ActionListener<TrackShardResponse>() {
@Override
public void onResponse(TrackShardResponse unused) {
replicationListener.onReplicationDone(replicationState);
recoveryState.getIndex().setFileDetailsComplete();
finalizeRecovery();
postRecovery("Shard setup complete.");
}
if (indexSettings.isSegrepEnabled()) {
IndexShard indexShard = this;
segmentReplicationReplicaService.prepareForReplication(
this,
recoveryState.getTargetNode(),
recoveryState.getSourceNode(),
new ActionListener<TrackShardResponse>() {
@Override
public void onResponse(TrackShardResponse unused) {
replicationListener.onReplicationDone(replicationState);
recoveryState.getIndex().setFileDetailsComplete();
finalizeRecovery();
postRecovery("Shard setup complete.");
}

@Override
public void onFailure(Exception e) {
replicationListener.onReplicationFailure(
replicationState,
new ReplicationFailedException(indexShard, e),
true
);
@Override
public void onFailure(Exception e) {
replicationListener.onReplicationFailure(
replicationState,
new ReplicationFailedException(indexShard, e),
true
);
}
}
}
);
);
} else {
peerRecoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener);
}
} catch (Exception e) {
logger.error("Error preparing the shard for Segment replication", e);
failShard("corrupted preexisting index", e);
recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true);
}
Expand Down Expand Up @@ -3295,6 +3299,12 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
this.warmer.warm(reader);
}
};
final List<ReferenceManager.RefreshListener> internalRefreshListener;
if (indexSettings.isSegrepEnabled()) {
internalRefreshListener = Arrays.asList(new RefreshMetricUpdater(refreshMetric), checkpointRefreshListener);
} else {
internalRefreshListener = Collections.singletonList(new RefreshMetricUpdater(refreshMetric));
}
return this.engineConfigFactory.newEngineConfig(
shardId,
threadPool,
Expand All @@ -3311,7 +3321,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Arrays.asList(refreshListeners, refreshPendingLocationListener),
Arrays.asList(new RefreshMetricUpdater(refreshMetric), checkpointRefreshListener),
internalRefreshListener,
indexSort,
circuitBreakerService,
globalCheckpointSupplier,
Expand Down Expand Up @@ -3910,7 +3920,7 @@ ReplicationTracker getReplicationTracker() {
public boolean scheduledRefresh() {
// skip if not primary shard.
// TODO: Segrep - should split into primary/replica classes.
if (shardRouting.primary() == false) {
if ((indexSettings.isSegrepEnabled()) && (shardRouting.primary() == false)) {
return false;
}
verifyNotClosed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -151,7 +152,9 @@ private void innerWriteFileChunk(StoreFileMetadata fileMetadata, long position,
+ Arrays.toString(store.directory().listAll());
// TODO: Segrep - toggle this with a setting. With segrep we don't want this fsync we will only fsync
// when a new checkpoint is received.
// store.directory().sync(Collections.singleton(temporaryFileName));
if (store.indexSettings().isSegrepEnabled() == false) {
store.directory().sync(Collections.singleton(temporaryFileName));
}
IndexOutput remove = removeOpenIndexOutputs(name);
assert remove == null || remove == indexOutput; // remove maybe null if we got finished
}
Expand Down
Loading