Skip to content

Commit

Permalink
Add backpressure in write path on remote segments lag
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed May 8, 2023
1 parent 1eb8c5d commit 048398f
Show file tree
Hide file tree
Showing 18 changed files with 375 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,7 @@ public static final IndexShard newIndexShard(
cbs,
(indexSettings, shardRouting) -> new InternalTranslogFactory(),
SegmentReplicationCheckpointPublisher.EMPTY,
null,
null
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.opensearch.index.mapper.MapperException;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.SourceToParse;
import org.opensearch.index.remote.RemoteRefreshSegmentPressureService;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
Expand Down Expand Up @@ -135,6 +136,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
private final UpdateHelper updateHelper;
private final MappingUpdatedAction mappingUpdatedAction;
private final SegmentReplicationPressureService segmentReplicationPressureService;
private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService;

/**
* This action is used for performing primary term validation. With remote translog enabled, the translogs would
Expand All @@ -158,6 +160,7 @@ public TransportShardBulkAction(
ActionFilters actionFilters,
IndexingPressureService indexingPressureService,
SegmentReplicationPressureService segmentReplicationPressureService,
RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService,
SystemIndices systemIndices
) {
super(
Expand All @@ -179,6 +182,7 @@ public TransportShardBulkAction(
this.updateHelper = updateHelper;
this.mappingUpdatedAction = mappingUpdatedAction;
this.segmentReplicationPressureService = segmentReplicationPressureService;
this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService;

this.transportPrimaryTermValidationAction = ACTION_NAME + "[validate_primary_term]";

Expand Down Expand Up @@ -528,8 +532,15 @@ private void finishRequest() {

@Override
protected Releasable checkPrimaryLimits(BulkShardRequest request, boolean rerouteWasLocal, boolean localRerouteInitiatedByNodeClient) {
if (force(request) == false && segmentReplicationPressureService.isSegmentReplicationBackpressureEnabled()) {
segmentReplicationPressureService.isSegrepLimitBreached(request.shardId());
if (force(request) == false) {
if (segmentReplicationPressureService.isSegmentReplicationBackpressureEnabled()) {
segmentReplicationPressureService.isSegrepLimitBreached(request.shardId());
}
// TODO - See if there is a way to have an common pressure service interface which is implemented by
// segment replication and remote refresh segment pressure service
if (remoteRefreshSegmentPressureService.isSegmentsUploadBackpressureEnabled()) {
remoteRefreshSegmentPressureService.validateSegmentsUploadLag(request.shardId());
}
}
return super.checkPrimaryLimits(request, rerouteWasLocal, localRerouteInitiatedByNodeClient);
}
Expand Down
7 changes: 5 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.index.query.SearchIndexNameMatcher;
import org.opensearch.index.remote.RemoteRefreshSegmentPressureService;
import org.opensearch.index.seqno.RetentionLeaseSyncer;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
Expand Down Expand Up @@ -438,7 +439,8 @@ public synchronized IndexShard createShard(
final ShardRouting routing,
final Consumer<ShardId> globalCheckpointSyncer,
final RetentionLeaseSyncer retentionLeaseSyncer,
final SegmentReplicationCheckpointPublisher checkpointPublisher
final SegmentReplicationCheckpointPublisher checkpointPublisher,
final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService
) throws IOException {
Objects.requireNonNull(retentionLeaseSyncer);
/*
Expand Down Expand Up @@ -506,7 +508,8 @@ public synchronized IndexShard createShard(
circuitBreakerService,
translogFactorySupplier,
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
remoteStore
remoteStore,
remoteRefreshSegmentPressureService
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,28 +169,28 @@ ShardId getShardId() {
return shardId;
}

long getLocalRefreshSeqNo() {
public long getLocalRefreshSeqNo() {
return localRefreshSeqNo;
}

void updateLocalRefreshSeqNo(long localRefreshSeqNo) {
assert localRefreshSeqNo > this.localRefreshSeqNo : "newLocalRefreshSeqNo="
public void updateLocalRefreshSeqNo(long localRefreshSeqNo) {
assert localRefreshSeqNo >= this.localRefreshSeqNo : "newLocalRefreshSeqNo="
+ localRefreshSeqNo
+ ">="
+ " < "
+ "currentLocalRefreshSeqNo="
+ this.localRefreshSeqNo;
this.localRefreshSeqNo = localRefreshSeqNo;
computeRefreshSeqNoLag();
}

long getLocalRefreshTimeMs() {
public long getLocalRefreshTimeMs() {
return localRefreshTimeMs;
}

void updateLocalRefreshTimeMs(long localRefreshTimeMs) {
assert localRefreshTimeMs > this.localRefreshTimeMs : "newLocalRefreshTimeMs="
public void updateLocalRefreshTimeMs(long localRefreshTimeMs) {
assert localRefreshTimeMs >= this.localRefreshTimeMs : "newLocalRefreshTimeMs="
+ localRefreshTimeMs
+ ">="
+ " < "
+ "currentLocalRefreshTimeMs="
+ this.localRefreshTimeMs;
this.localRefreshTimeMs = localRefreshTimeMs;
Expand All @@ -201,10 +201,10 @@ long getRemoteRefreshSeqNo() {
return remoteRefreshSeqNo;
}

void updateRemoteRefreshSeqNo(long remoteRefreshSeqNo) {
assert remoteRefreshSeqNo > this.remoteRefreshSeqNo : "newRemoteRefreshSeqNo="
public void updateRemoteRefreshSeqNo(long remoteRefreshSeqNo) {
assert remoteRefreshSeqNo >= this.remoteRefreshSeqNo : "newRemoteRefreshSeqNo="
+ remoteRefreshSeqNo
+ ">="
+ " < "
+ "currentRemoteRefreshSeqNo="
+ this.remoteRefreshSeqNo;
this.remoteRefreshSeqNo = remoteRefreshSeqNo;
Expand All @@ -215,10 +215,10 @@ long getRemoteRefreshTimeMs() {
return remoteRefreshTimeMs;
}

void updateRemoteRefreshTimeMs(long remoteRefreshTimeMs) {
assert remoteRefreshTimeMs > this.remoteRefreshTimeMs : "newRemoteRefreshTimeMs="
public void updateRemoteRefreshTimeMs(long remoteRefreshTimeMs) {
assert remoteRefreshTimeMs >= this.remoteRefreshTimeMs : "newRemoteRefreshTimeMs="
+ remoteRefreshTimeMs
+ ">="
+ " < "
+ "currentRemoteRefreshTimeMs="
+ this.remoteRefreshTimeMs;
this.remoteRefreshTimeMs = remoteRefreshTimeMs;
Expand Down Expand Up @@ -249,23 +249,23 @@ long getUploadBytesStarted() {
return uploadBytesStarted;
}

void addUploadBytesStarted(long size) {
public void addUploadBytesStarted(long size) {
uploadBytesStarted += size;
}

long getUploadBytesFailed() {
return uploadBytesFailed;
}

void addUploadBytesFailed(long size) {
public void addUploadBytesFailed(long size) {
uploadBytesFailed += size;
}

long getUploadBytesSucceeded() {
public long getUploadBytesSucceeded() {
return uploadBytesSucceeded;
}

void addUploadBytesSucceeded(long size) {
public void addUploadBytesSucceeded(long size) {
uploadBytesSucceeded += size;
}

Expand All @@ -277,15 +277,15 @@ long getTotalUploadsStarted() {
return totalUploadsStarted;
}

void incrementTotalUploadsStarted() {
public void incrementTotalUploadsStarted() {
totalUploadsStarted += 1;
}

long getTotalUploadsFailed() {
return totalUploadsFailed;
}

void incrementTotalUploadsFailed() {
public void incrementTotalUploadsFailed() {
totalUploadsFailed += 1;
failures.record(true);
}
Expand All @@ -294,7 +294,7 @@ long getTotalUploadsSucceeded() {
return totalUploadsSucceeded;
}

void incrementTotalUploadSucceeded() {
public void incrementTotalUploadsSucceeded() {
totalUploadsSucceeded += 1;
failures.record(false);
}
Expand Down Expand Up @@ -323,16 +323,21 @@ Map<String, Long> getLatestLocalFileNameLengthMap() {
return latestLocalFileNameLengthMap;
}

void setLatestLocalFileNameLengthMap(Map<String, Long> latestLocalFileNameLengthMap) {
public void setLatestLocalFileNameLengthMap(Map<String, Long> latestLocalFileNameLengthMap) {
this.latestLocalFileNameLengthMap = latestLocalFileNameLengthMap;
computeBytesLag();
}

void addToLatestUploadFiles(String file) {
public void addToLatestUploadFiles(String file) {
this.latestUploadFiles.add(file);
computeBytesLag();
}

public void setLatestUploadFiles(Set<String> files) {
this.latestUploadFiles.clear();
this.latestUploadFiles.addAll(files);
}

private void computeBytesLag() {
if (latestLocalFileNameLengthMap == null || latestLocalFileNameLengthMap.isEmpty()) {
return;
Expand All @@ -356,7 +361,7 @@ boolean isUploadBytesAverageReady() {
return uploadBytesMovingAverageReference.get().getAverage();
}

void addUploadBytes(long size) {
public void addUploadBytes(long size) {
synchronized (uploadBytesMutex) {
this.uploadBytesMovingAverageReference.get().record(size);
}
Expand All @@ -381,7 +386,7 @@ boolean isUploadBytesPerSecAverageReady() {
return uploadBytesPerSecMovingAverageReference.get().getAverage();
}

void addUploadBytesPerSec(long bytesPerSec) {
public void addUploadBytesPerSec(long bytesPerSec) {
synchronized (uploadBytesPerSecMutex) {
this.uploadBytesPerSecMovingAverageReference.get().record(bytesPerSec);
}
Expand All @@ -406,7 +411,7 @@ boolean isUploadTimeMsAverageReady() {
return uploadTimeMsMovingAverageReference.get().getAverage();
}

void addUploadTimeMs(long timeMs) {
public void addUploadTimeMs(long timeMs) {
synchronized (uploadTimeMsMutex) {
this.uploadTimeMsMovingAverageReference.get().record(timeMs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@
import org.opensearch.index.merge.MergeStats;
import org.opensearch.index.recovery.RecoveryStats;
import org.opensearch.index.refresh.RefreshStats;
import org.opensearch.index.remote.RemoteRefreshSegmentPressureService;
import org.opensearch.index.search.stats.SearchStats;
import org.opensearch.index.search.stats.ShardSearchStats;
import org.opensearch.index.seqno.ReplicationTracker;
Expand Down Expand Up @@ -326,8 +327,8 @@ Runnable getGlobalCheckpointSyncer() {
private volatile boolean useRetentionLeasesInPeerRecovery;
private final Store remoteStore;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;

private final boolean isTimeSeriesIndex;
private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService;

public IndexShard(
final ShardRouting shardRouting,
Expand All @@ -352,7 +353,8 @@ public IndexShard(
final CircuitBreakerService circuitBreakerService,
final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
@Nullable final Store remoteStore
@Nullable final Store remoteStore,
final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand Down Expand Up @@ -447,6 +449,7 @@ public boolean shouldCache(Query query) {
this.isTimeSeriesIndex = (mapperService == null || mapperService.documentMapper() == null)
? false
: mapperService.documentMapper().mappers().containsTimeStampField();
this.remoteRefreshSegmentPressureService = remoteRefreshSegmentPressureService;
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -3548,7 +3551,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
if (isRemoteStoreEnabled()) {
internalRefreshListener.add(new RemoteStoreRefreshListener(this));
internalRefreshListener.add(new RemoteStoreRefreshListener(this, remoteRefreshSegmentPressureService));
}
if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary()) {
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
Expand Down
Loading

0 comments on commit 048398f

Please sign in to comment.