Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fork the sending of file chunks during recovery #74164

Merged

Conversation

DaveCTurner
Copy link
Contributor

Today if sending file chunks is CPU-bound (e.g. when using compression)
then we tend to concentrate all that work onto relatively few threads,
even if indices.recovery.max_concurrent_file_chunks is increased. With
this commit we fork the transmission of each chunk onto its own thread
so that the CPU-bound work can happen in parallel.

Today if sending file chunks is CPU-bound (e.g. when using compression)
then we tend to concentrate all that work onto relatively few threads,
even if `indices.recovery.max_concurrent_file_chunks` is increased. With
this commit we fork the transmission of each chunk onto its own thread
so that the CPU-bound work can happen in parallel.
@DaveCTurner DaveCTurner added >enhancement :Distributed Indexing/Recovery Anything around constructing a new shard, either from a local or a remote source. v8.0.0 v7.14.0 labels Jun 16, 2021
@elasticmachine elasticmachine added the Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. label Jun 16, 2021
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed (Team:Distributed)

@DaveCTurner
Copy link
Contributor Author

DaveCTurner commented Jun 16, 2021

I benchmarked this with a 2.7GB (2757508192B) single-shard index being recovered between two nodes both running on my localhost, so network performance was effectively infinite. Here are the recovery times in milliseconds; "no fork" being today's behaviour and "fork" being the behaviour with this PR applied.

Concurrent chunks No fork, no compress No fork, compress Fork, no compress Fork, compress
1 12333 92035 10880 90692
2 11465 73406 10083 47946
3 10020 74651 9943 33440
4 10527 71518 10737 27793
5 11760 73879 10266 22617
6 11795 73998 9563 20162
7 9101 72073 9413 18839
8 10893 72563 11715 17123

image

image

The cases without compression aren't CPU-bound so it didn't make much difference, but when compression is enabled it has a pretty significant effect. I also increased the max for the concurrent chunks setting to 8 since I was still seeing improvements at today's max of 5.

Copy link
Member

@original-brownbear original-brownbear left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but I do wonder if compression has to be this expensive, this seems broken somehow.

// Fork the actual sending onto a separate thread so we can send them concurrently even if CPU-bound (e.g. using compression).
// The AsyncIOProcessor and MultiFileWriter both concentrate their work onto fewer threads if possible, but once we have
// chunks to send we want to increase parallelism again.
threadPool.generic().execute(ActionRunnable.wrap(listener, l ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know why compression is so expensive (i.e. which part of it?). I wonder if we could get an improvement out of increasing the chunk size if most of the cost is the per compression start/end overhead (maybe there's something to improve there as well).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's just that DEFLATE is expensive, especially if the data is already deflated (as it is for stored fields which make up the bulk of this index). pv _1t.fdt | gzip > /dev/null indicates my machine can only sustain 25MiBps on a single thread, which is about the performance we see with no parallelism.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now I remember us benchmarking DEFLATE before => nothing we can do but parallelize I guess :)

@DaveCTurner DaveCTurner merged commit 3660d86 into elastic:master Jun 16, 2021
@DaveCTurner
Copy link
Contributor Author

Thanks Armin!

@DaveCTurner DaveCTurner deleted the 2021-06-16-fork-sending-file-chunks branch June 16, 2021 10:58
DaveCTurner added a commit that referenced this pull request Jun 16, 2021
Today if sending file chunks is CPU-bound (e.g. when using compression)
then we tend to concentrate all that work onto relatively few threads,
even if `indices.recovery.max_concurrent_file_chunks` is increased. With
this commit we fork the transmission of each chunk onto its own thread
so that the CPU-bound work can happen in parallel.
limingnihao pushed a commit to limingnihao/elasticsearch that referenced this pull request Jun 17, 2021
* master: (284 commits)
  [DOCS] Update central reporting image (elastic#74195)
  [DOCS] SQL: Document `null` handing for string functions (elastic#74201)
  Fix Snapshot Docs Listing Query Params in Body Incorrectly (elastic#74196)
  [DOCS] EQL: Note EQL uses `fields` parameter (elastic#74194)
  Mute failing MixedClusterClientYamlTestSuiteIT test {p0=indices.split/20_source_mapping/Split index ignores target template mapping} test (elastic#74198)
  Cleanup Duplicate Constants in Snapshot XContent Params (elastic#74114)
  [DOC] Add watcher to the threadpool doc (elastic#73935)
  [Rest Api Compatibility] Validate Query typed api (elastic#74171)
  Replace deprecated `script.cache.*` settings with `script.context.$constext.cache_*` in documentation. (elastic#74144)
  Pin Alpine Linux version in Docker builds (elastic#74169)
  Fix clone API settings docs bug (elastic#74175)
  [ML] refactor internal datafeed management (elastic#74018)
  Disable query cache for FunctionScoreQuery and ScriptScoreQuery (elastic#74060)
  Fork the sending of file chunks during recovery (elastic#74164)
  RuntimeField.Builder should not extend FieldMapper.Builder (elastic#73840)
  Run CheckIndex on metadata index before loading (elastic#73239)
  Deprecate setting version on analyzers (elastic#74073)
  Add test with null transform id in stats request (elastic#74130)
  Order imports when reformatting (elastic#74059)
  Move deprecation code from xpack core to deprecation module. (elastic#74120)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Indexing/Recovery Anything around constructing a new shard, either from a local or a remote source. >enhancement Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. v7.14.0 v8.0.0-alpha1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants