Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Back pressure for lagging remote segments upload #6851

Closed
ashking94 opened this issue Mar 28, 2023 · 1 comment
Closed

Back pressure for lagging remote segments upload #6851

ashking94 opened this issue Mar 28, 2023 · 1 comment
Assignees
Labels
distributed framework enhancement Enhancement or improvement to existing feature or request Storage:Durability Issues and PRs related to the durability framework

Comments

@ashking94
Copy link
Member

ashking94 commented Mar 28, 2023

Back pressure for write requests during remote segments upload

This issue focuses on decision making for rejecting write requests when the remote store upload is failing or the segments state on remote store is lagging when compared to the local segments state. The documents list down details about the considerations like different gating characteristics, node/shard level evaluation scope, isolating failures and delayed uploads etc while making the decision to reject a write request.

1. Background

With remote backed indexes, the segments and the translogs are uploaded to the remote store to achieve refresh or request level durability. The remote segment store also plays a vital role in ensuring deletion of translogs from local/remote store (basis remote segments upload), and in transfer of segments during segment replication. With this added integration point to remote store, this also becomes a point of failure. The uploads onto remote store can slow down or fail due to network issues, throttling from remote store, long GC pauses or high CPU utilisation. If there are issues with remote store uploads, then there should be a mechanism to put back pressure on the write path and eventually remove the back pressure if situation improves or eventually fail the shard if the situation deteriorates.

1.1. Problem Statement

For remote backed indexes, the segments are uploaded on account of refreshes (these are triggered not just for refreshes, but also for flushes, segment merges) which helps in achieving refresh level durability. On top of that, a user can also enable remote translog which gives request level durability. Now, if the segments upload onto remote store is failing, then the segments upload will be tried on the next refresh. The remote segments can start lagging on account of one or more remote store upload failures and lead to stale data on remote store. Currently, segment replication uses peer to peer copying of segment files, so it would not lead to failed segment replication attempt & stale searches due to lag in remote store uploads. However, there is plan to use remote store for copying the segment files (the flow would be : 1. primary uploads segments to remote store, 2. primary publishes checkpoint action to replicas & finally 3. replicas downloads segment files). There is also plan to extend the hydration of follower cluster in the cross cluster replication use case.

1.2. User stories

  • As a user, I want to be aware of the lag that remote store has in comparison to the primary’s segment state. This will help me to isolate issues and fix them.
  • As a user, I want to be gauge the lag in terms of 1) how stale is the remote segment store is in comparison to local segment store, 2) how many bytes is the remote segment store behind the local segment store, 3) how many refresh checkpoint is the remote segment store behind the local store, and couple more listed in section 3 - Considerations.
  • As a user, I want OpenSearch to stop taking in new writes if the remote segment store is not upto date along with being able to define the threshold that triggers the rejection.
  • As a user, I want to have granular metrics visibility of remote segments upload success, failures, delays, Shard/Index-level, node-level, or cluster-level metrics which helps me to see the overall health of my cluster and take action if required.

2. Goal

Introduce back pressure mechanism to reject requests when the remote store segments uploads are lagging behind the local segments state. Give visibility on important remote store stats across shard level, node level and cluster level.

Extended Goal

Fail the shard if the lags goes to a point beyond a configurable threshold. * we would need to be careful in failing the shards here as that can lead to shards bouncing between nodes while the issue still existing.

3. Considerations

There are multiple areas that needs to be considered for deciding the back pressure mechanism. Below there are couple of areas listed -

3.1. Gating mechanism parameters

The back pressure or the shard failure on remote segments can consider the following characteristics -

  • Local segments are ahead of remote segments by “N” checkpoints.
  • Local segments are ahead of remote segments by “S” bytes.
  • Local segments are ahead of remote segments by “T” time.
  • Uploads have failed “N” times consecutively or Uploads have failed more than “N” time in last “M” attempts.
  • Local segments are ahead of remote segments by “X” documents.

Used parameters for rejection decision

Below are the parameters we will be using for rejection logic -

1. Ahead by “N” checkpoints

Let’s assume we assign refresh a sequence number. If the local refresh is at seq_no x and while the remote store has seen last successful upload at seq_no y, the lag is (x-y) checkpoints / seq_no. If the diff (x-y) is more than M (backed by a dynamic setting), then we start putting back pressure in the write path.

2. Ahead by “S” bytes

If the local segments holds segments state S1 while the remote store holds segments state S2 where S2 is an older state, then it is possible that S1 has many more segment files that S2 does not. In cases like this, we first get the diff of files between local and remote store, and then calculate the number of bytes “S” that needs to be uploaded onto remote store. If “S” is above a dynamic threshold T1, then we start putting back pressure.

Dynamic threshold T1 is computed in the following manner. We will have a dynamic setting remote_store.segment_upload.pressure.bytes.lag.limit and we will compute a dynamic threshold based on moving average of last K bytes_behind. We will compute bytes_behind by comparing sizes of files that are present in local segment infos but not in remote store. We will also keep last N bytes_behind value. We will have the average times a variance factor “V” as the dynamically computed threshold. We take the minimum of the dynamic setting based limit and the dynamically computed threshold.

Setting based threshold
setting_based_threshold = backed by dynamic setting

Dynamic computed threshold
avg_bytes_behind = moving average of last “N” upload bytes values observed during remote segment uploads
variance_factor = backed by dynamic setting
dynamic_computed_threshold = (avg_bytes_behind x variance_factor)

During a write request, we would compute the dynamic threshold as minimum of

  • setting_based_threshold
  • dynamic_computed_threshold

3. Ahead by “T” time

Let’s assume that the most recent refresh on local occurred at “T1” time, and most recent successful upload as part of remote refresh listener occurred at “T2” time. If the time difference between time “T1” and “T2” is above a threshold “X”, then we start putting back pressure in the write path.

The threshold “X” is the maximum of index refresh interval, min of (setting based threshold and a dynamic moving average threshold). The dynamic moving average threshold is basically the upload time moving average multiplied by a variance factor.

Settings
setting_based_threshold = backed by dynamic setting
variance_factor = backed by dynamic setting

Dynamic computed threshold
avg_upload_time = average of upload time in last N remote refresh
dynamic_computed_threshold = (avg_upload_time x variance_factor)

During a write request, we would compute the threshold “X” as maximum of

  • index refresh interval
  • minimum of setting_based_threshold and dynamic_computed_threshold

4. “N” Continuous failures

If n consecutive uploads have failed, start rejecting requests.

5. Inflight upload bytes > “X” bytes

Inflight bytes can be calculated by subtracting failed and succeeded upload bytes from started upload bytes. In case we see that there the current upload bytes is greater than the dynamic threshold, we reject the request.

Dynamic computed threshold
avg_upload_bytes = moving average of last “N” upload bytes values observed during remote segment uploads
variance_factor = backed by dynamic setting
dynamic_computed_threshold = (avg_bytes_behind x variance_factor)

Unused parameters

1. Ahead by “X” documents

If the local segments has M documents searchable while the remote segments has N documents searchable and the difference between M and N is greater than a configurable threshold, then we apply the back pressure in the write path. If the difference goes above a certain configurable threshold, then we can fail the shard. This is an approximate value of documents in the segments. Will mostly not use this for rejection decision.

Note - this is not being considered in the logic for checking back-pressure decision. because getting the exact count of docs is non-trivial. Using overall count of docs in the segments file will not provide accurate value of the difference since there can be docs deleted and added newly between the 2 states of segment files. Other option on relying on the auto-generated seqNo which is also indexed with each document is not full proof for cases where there are documents getting deleted and subsequent segment merges.

Additional considerations - Additional considerations

3.2. Scope of back pressure

The back pressure can be handled at node, shard or repository level.

3.2.1 Node level back pressure

Back pressure on node level can consider aggregate values of the gating parameters over the node and make decision to reject writes requests or fail the shard appropriately.

Use cases -

  • If last m uploads have failed for all the primary shards on a node, then it is most likely a node issue, we apply rejection for sometime allowing the background upload to finish. If the upload is still failing, then it is suitable to fail the shards. Initially, we will only run this in shadow mode on failure of shards since failure of shard has a lot of overhead in reallocating shards to other nodes.

3.2.2 Shard or IndexShard level back pressure

Back pressure on shard level will only consider the underlying shard’s gating parameters and make decision to reject write requests.

Use cases -

  • If a shard is experiencing heavy indexing, there can be increased segments upload time. If we continue to accept more requests, the refresh lag seen by the user can increase even more. We can reject requests until the refresh lag is within an expected threshold.
  • If the remote segment upload is failing for a shard for m times, then we start rejecting write requests. Meanwhile, we are also trying to upload the most recent refreshed segment state.

3.2.3 Repository level back pressure

Today, a user can select same backing repository on global level or allow passing in different repository for different indexes. Hence, there can be one or more underlying backing repository in a cluster for different indexes. Back pressure on repository level will consider aggregates on repository level and make decision to reject write requests.

Use cases -

  • If multiple shards share the same repository and if there is one or more shard which is experiencing heavy lag, then we reject write requests for all shards that share same repository. This allow time for the lagging shard to catch up.

3.3. Failures vs Delays in Upload

The lag on remote store segments can happen on account of remote upload failure or due to slow uploads. We would need to maintain the count of failed and delayed uploads per shard and across node.
Use cases - If there are permanent failure, it would make sense to outright reject all requests. However, if there is a lag that is due to slow uploads, then we can throttle “X” percent of request. The value of “X” can be determined by considering how many bytes is the remote store behind by the local store.

4. Required Stats

Basis the above considerations, we would be needing the following stats -

Shard level

  • Checkpoint behind local - If locally M refreshes have happened, but remote has data upto N, checkpoint lag is M - N.
  • Time behind local - Latest refresh time on local - most recent upload time on remote store
  • Documents behind local - This is the last refreshed checkpoint on local minus the remote store refreshed checkpoint
  • Last N upload status - whether it failed or succeeded
  • Last N upload time - Time taken for upload to succeed or fail.
  • total upload started bytes - before the upload starts, the bytes are incremented here
  • total upload completed bytes - after the upload completes, the bytes are incremented here
  • total upload failed bytes - after the upload fails, the bytes are incremented here
  • total upload inflight bytes = (total upload started bytes) - (total upload completed bytes) - (total upload failed bytes)
  • bytes behind - computed by checking local and remote segment files.
  • total refresh uploads started
  • total refresh uploads completed
  • total refresh uploads failed
  • total refresh uploads in flight

Node level

  • remote upload success percent = considering last upload status for all shards on a node, percent of successful uploads.
  • remote upload failure percent = same as above, but numerator becomes failing shards.

Back pressure stats -

  • number of requests rejected for a shard
  • number of requests rejected on a node
  • total number of requests rejected on a cluster

5. Proposed approach

5.1. Back pressure logic

image (5)

Corner cases

  • Segment merges - Whenever there is a segment merge, an implicit refresh occurs. This will lead to new merged segments getting created, and suddenly the metrics like bytes_behind, inflight_upload_bytes will see a sudden jump. This can be handled by checking combining bytes_behind check with refresh_checkpoints_behind. We will also

Shard or Repository Level
refresh_checkpoints_behind - Refer 1. Ahead by “N” checkpoints
bytes_behind - Refer 2. Ahead by “S” bytes
time_behind - Refer 3. Ahead by “T” time
inflight_upload_bytes - size of segment files for which upload have started, but they are yet to succeed or fail.
last_m_consecutive_failures - last m segments upload attempts have been failing

Note - The above metrics are reported as stats as well.

Node Level
last_n_shard_failure_percent - If “x” absolute or “x” percent of total shards on a node have reported n consecutive failures, then we reject requests on all the shards. We would be emitting stats for the same. We also need to evaluate when to fail the shard in such cases or emitting the metrics is enough.

6. Execution plan

The execution will constitute of making following changes -

6.1. Testing

  • Functional testing - Make sure there is no regression in the OS functionality. Test back pressure works by inducing slow uploads or failures.
  • Performance testing - Will perform perf test with different combinations with and without segments upload back pressure. We should be able to see no degradation in throughput, latency, run duration, cpu, memory, garbage collection.
  • Long running performance runs

Testing scenarios

  1. Low indexing workload and evenly distributed shards
    Objective - verify stats are showing up correctly, no back pressure should apply. Verify upload stats are uniformly showing up per shard & per node.

  2. Heavy indexing workload and evenly distributed shards
    Objective - verify stats are showing up correctly, back pressure should be applied.

  3. Heavy indexing workload and unevenly distributed shards
    Objective - verify stats are showing up correctly, back pressure should be applied on shards under duress.

  4. Slow network throughput

  5. Throttling by remote store

  6. Failed remote segments upload / full remote store failure

@ashking94 ashking94 added enhancement Enhancement or improvement to existing feature or request untriaged labels Mar 28, 2023
@tlfeng tlfeng added Indexing Indexing, Bulk Indexing and anything related to indexing distributed framework and removed Indexing Indexing, Bulk Indexing and anything related to indexing labels Apr 4, 2023
@ashking94 ashking94 added the Storage:Durability Issues and PRs related to the durability framework label Apr 18, 2023
@ashking94 ashking94 self-assigned this Apr 18, 2023
@ashking94
Copy link
Member Author

Will be capturing testing results in #7225

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
distributed framework enhancement Enhancement or improvement to existing feature or request Storage:Durability Issues and PRs related to the durability framework
Projects
Status: Done
Development

No branches or pull requests

3 participants