Skip to content

Commit

Permalink
Increase remote recovery thread pool size (opensearch-project#10750)
Browse files Browse the repository at this point in the history
The remote recovery thread pool does blocking I/O when downloading
files, so the "half processor count max 10" was definitely too small.
This can be shown by triggering recoveries on a node that is also doing
segment replication, and the replication lag will increase due to
contention on that thread pool. Some amount of contention is inevitable,
but the change here to increase the download thread pool, and also limit
the concurrent usage of that thread pool by any single
recovery/replication to 25% of the threads does help.

Long term, we can improve this even further by moving to fully async I/O
to avoid blocking threads in the application on draining InputStreams.

Signed-off-by: Andrew Ross <[email protected]>
  • Loading branch information
andrross authored Oct 20, 2023
1 parent ffe9371 commit 1e28738
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;

Expand Down Expand Up @@ -87,10 +88,10 @@ public class RecoverySettings {
/**
* Controls the maximum number of streams that can be started concurrently per recovery when downloading from the remote store.
*/
public static final Setting<Integer> INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING = Setting.intSetting(
public static final Setting<Integer> INDICES_RECOVERY_MAX_CONCURRENT_REMOTE_STORE_STREAMS_SETTING = new Setting<>(
"indices.recovery.max_concurrent_remote_store_streams",
10,
1,
(s) -> Integer.toString(Math.max(1, OpenSearchExecutors.allocatedProcessors(s) / 2)),
(s) -> Setting.parseInt(s, 1, "indices.recovery.max_concurrent_remote_store_streams"),
Property.Dynamic,
Property.NodeScope
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,12 @@ public ThreadPool(
);
builders.put(
Names.REMOTE_RECOVERY,
new ScalingExecutorBuilder(Names.REMOTE_RECOVERY, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5))
new ScalingExecutorBuilder(
Names.REMOTE_RECOVERY,
1,
twiceAllocatedProcessors(allocatedProcessors),
TimeValue.timeValueMinutes(5)
)
);
if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)) {
builders.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso
sizes.put(ThreadPool.Names.TRANSLOG_SYNC, n -> 4 * n);
sizes.put(ThreadPool.Names.REMOTE_PURGE, ThreadPool::halfAllocatedProcessorsMaxFive);
sizes.put(ThreadPool.Names.REMOTE_REFRESH_RETRY, ThreadPool::halfAllocatedProcessorsMaxTen);
sizes.put(ThreadPool.Names.REMOTE_RECOVERY, ThreadPool::halfAllocatedProcessorsMaxTen);
sizes.put(ThreadPool.Names.REMOTE_RECOVERY, ThreadPool::twiceAllocatedProcessors);
return sizes.get(threadPoolName).apply(numberOfProcessors);
}

Expand Down

0 comments on commit 1e28738

Please sign in to comment.