Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fork the sending of file chunks during recovery #74164

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions docs/reference/modules/indices/recovery.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,12 @@ You can use the following _expert_ setting to manage resources for peer
recoveries.

`indices.recovery.max_concurrent_file_chunks`::
(<<cluster-update-settings,Dynamic>>, Expert) Number of file chunk requests
sent in parallel for each recovery. Defaults to `2`.
(<<cluster-update-settings,Dynamic>>, Expert) Number of file chunks sent in
parallel for each recovery. Defaults to `2`.
+
You can increase the value of this setting when the recovery of a single shard
is not reaching the traffic limit set by `indices.recovery.max_bytes_per_sec`.
is not reaching the traffic limit set by `indices.recovery.max_bytes_per_sec`,
up to a maximum of `8`.

`indices.recovery.max_concurrent_operations`::
(<<cluster-update-settings,Dynamic>>, Expert) Number of operations sent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class RecoverySettings {
* Controls the maximum number of file chunk requests that can be sent concurrently from the source node to the target node.
*/
public static final Setting<Integer> INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING =
Setting.intSetting("indices.recovery.max_concurrent_file_chunks", 2, 1, 5, Property.Dynamic, Property.NodeScope);
Setting.intSetting("indices.recovery.max_concurrent_file_chunks", 2, 1, 8, Property.Dynamic, Property.NodeScope);

/**
* Controls the maximum number of operation chunk requests that can be sent concurrently from the source node to the target node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.RetryableAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.breaker.CircuitBreakingException;
Expand Down Expand Up @@ -202,12 +203,17 @@ public void writeFileChunk(StoreFileMetadata fileMetadata, long position, Releas
final RecoveryFileChunkRequest request = new RecoveryFileChunkRequest(
recoveryId, requestSeqNo, shardId, fileMetadata, position, content, lastChunk, totalTranslogOps, throttleTimeInNanos);
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
executeRetryableAction(
action,
request,
fileChunkRequestOptions,
ActionListener.runBefore(listener.map(r -> null), request::decRef),
reader);

// Fork the actual sending onto a separate thread so we can send them concurrently even if CPU-bound (e.g. using compression).
// The AsyncIOProcessor and MultiFileWriter both concentrate their work onto fewer threads if possible, but once we have
// chunks to send we want to increase parallelism again.
threadPool.generic().execute(ActionRunnable.wrap(listener, l ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know why compression is so expensive (i.e. which part of it?). I wonder if we could get an improvement out of increasing the chunk size if most of the cost is the per compression start/end overhead (maybe there's something to improve there as well).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's just that DEFLATE is expensive, especially if the data is already deflated (as it is for stored fields which make up the bulk of this index). pv _1t.fdt | gzip > /dev/null indicates my machine can only sustain 25MiBps on a single thread, which is about the performance we see with no parallelism.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now I remember us benchmarking DEFLATE before => nothing we can do but parallelize I guess :)

executeRetryableAction(
action,
request,
fileChunkRequestOptions,
ActionListener.runBefore(l.map(r -> null), request::decRef),
reader)));
}

@Override
Expand Down