From a9fa38967bfd64ca1f0442097dc3513b185697d8 Mon Sep 17 00:00:00 2001 From: Karol Sobczak Date: Fri, 26 Nov 2021 17:08:02 +0100 Subject: [PATCH] Adjust task_concurrency to number of physical cores Logical (hyper threaded) cores do not improve query performance. Therefore task_concurrency needs to be adjusted to number of physical cores. AWS r5.8xlarge machines (intel, 16 physical cores with HT, 32 logical cores) label TPCH wall time TPC-DS wall time TPCH CPU time TPC-DS CPU time TPCH peak mem TPC-DS peak mem 0 concurrency_16_part 1097.604667 1346.841167 119453.0 156423.427667 2.165998e+09 1.282450e+09 1 concurrency_32_part 1056.615500 1370.033500 129010.1 164247.540000 2.164635e+09 1.310248e+09 2 concurrency_16_unpart 904.007167 2234.841000 112177.4 288472.953333 2.101063e+09 1.137481e+09 3 concurrency_32_unpart 907.333500 2300.626167 120640.0 302876.445500 2.119834e+09 1.186475e+09 AWS r5g.8xlarge machines (graviton, 32 physical cores) label TPCH wall time TPC-DS wall time TPCH CPU time TPC-DS CPU time TPCH peak mem TPC-DS peak mem 0 concurrency_16_part 1063.112833 1256.248833 113851.2 135454.912167 2.129303e+09 1.265366e+09 1 concurrency_32_part 980.932667 1258.293667 113708.1 136666.440167 2.144433e+09 1.276637e+09 2 concurrency_16_unpart 811.310000 1991.245667 98893.1 242522.966833 2.081300e+09 1.102874e+09 3 concurrency_32_unpart 757.619333 1953.809333 99628.9 242261.435167 2.098105e+09 1.150644e+09 --- core/trino-main/pom.xml | 5 ++ .../io/trino/execution/TaskManagerConfig.java | 7 ++- .../main/java/io/trino/util/MachineInfo.java | 55 +++++++++++++++++++ .../execution/TestTaskManagerConfig.java | 12 +++- pom.xml | 8 ++- 5 files changed, 82 insertions(+), 5 deletions(-) create mode 100644 core/trino-main/src/main/java/io/trino/util/MachineInfo.java diff --git a/core/trino-main/pom.xml b/core/trino-main/pom.xml index d00f56398ece..20596f65df70 100644 --- a/core/trino-main/pom.xml +++ b/core/trino-main/pom.xml @@ -197,6 +197,11 @@ jackson-databind + + com.github.oshi + oshi-core + + com.github.scribejava scribejava-apis diff --git a/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java b/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java index fb75752a5033..e19f460c8cba 100644 --- a/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java +++ b/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java @@ -30,6 +30,10 @@ import java.math.BigDecimal; import java.util.concurrent.TimeUnit; +import static io.trino.util.MachineInfo.getAvailablePhysicalProcessorCount; +import static it.unimi.dsi.fastutil.HashCommon.nextPowerOfTwo; +import static java.lang.Math.min; + @DefunctConfig({ "experimental.big-query-max-task-memory", "task.max-memory", @@ -67,7 +71,8 @@ public class TaskManagerConfig private Duration infoUpdateInterval = new Duration(3, TimeUnit.SECONDS); private int writerCount = 1; - private int taskConcurrency = 16; + // cap task concurrency to 32 in order to avoid small pages produced by local partitioning exchanges + private int taskConcurrency = min(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 32); private int httpResponseThreads = 100; private int httpTimeoutThreads = 3; diff --git a/core/trino-main/src/main/java/io/trino/util/MachineInfo.java b/core/trino-main/src/main/java/io/trino/util/MachineInfo.java new file mode 100644 index 000000000000..9011b59f40dd --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/util/MachineInfo.java @@ -0,0 +1,55 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.util; + +import com.google.common.base.StandardSystemProperty; +import oshi.SystemInfo; + +import static java.lang.Math.min; + +public final class MachineInfo +{ + // cache physical processor count, so that it's not queried multiple times during tests + private static volatile int physicalProcessorCount = -1; + + private MachineInfo() {} + + public static int getAvailablePhysicalProcessorCount() + { + if (physicalProcessorCount != -1) { + return physicalProcessorCount; + } + + String osArch = StandardSystemProperty.OS_ARCH.value(); + // logical core count (including container cpu quota if there is any) + int availableProcessorCount = Runtime.getRuntime().availableProcessors(); + int totalPhysicalProcessorCount; + if ("amd64".equals(osArch) || "x86_64".equals(osArch)) { + // Oshi can recognize physical processor count (without hyper threading) for x86 platforms. + // However, it doesn't correctly recognize physical processor count for ARM platforms. + totalPhysicalProcessorCount = new SystemInfo() + .getHardware() + .getProcessor() + .getPhysicalProcessorCount(); + } + else { + // ARM platforms do not support hyper threading, therefore each logical processor is separate core + totalPhysicalProcessorCount = availableProcessorCount; + } + + // cap available processor count to container cpu quota (if there is any). + physicalProcessorCount = min(totalPhysicalProcessorCount, availableProcessorCount); + return physicalProcessorCount; + } +} diff --git a/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java b/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java index 481ffdefe5bd..cd54f510a3ff 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java @@ -26,9 +26,14 @@ import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; import static io.airlift.units.DataSize.Unit; +import static io.trino.util.MachineInfo.getAvailablePhysicalProcessorCount; +import static it.unimi.dsi.fastutil.HashCommon.nextPowerOfTwo; +import static java.lang.Math.min; public class TestTaskManagerConfig { + private static final int DEFAULT_PROCESSOR_COUNT = min(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 32); + @Test public void testDefaults() { @@ -54,7 +59,7 @@ public void testDefaults() .setSinkMaxBroadcastBufferSize(DataSize.of(200, Unit.MEGABYTE)) .setMaxPagePartitioningBufferSize(DataSize.of(32, Unit.MEGABYTE)) .setWriterCount(1) - .setTaskConcurrency(16) + .setTaskConcurrency(DEFAULT_PROCESSOR_COUNT) .setHttpResponseThreads(100) .setHttpTimeoutThreads(3) .setTaskNotificationThreads(5) @@ -66,6 +71,7 @@ public void testDefaults() @Test public void testExplicitPropertyMappings() { + int processorCount = DEFAULT_PROCESSOR_COUNT == 32 ? 16 : 32; Map properties = new ImmutableMap.Builder() .put("task.initial-splits-per-node", "1") .put("task.split-concurrency-adjustment-interval", "1s") @@ -88,7 +94,7 @@ public void testExplicitPropertyMappings() .put("sink.max-broadcast-buffer-size", "128MB") .put("driver.max-page-partitioning-buffer-size", "40MB") .put("task.writer-count", "4") - .put("task.concurrency", "8") + .put("task.concurrency", Integer.toString(processorCount)) .put("task.http-response-threads", "4") .put("task.http-timeout-threads", "10") .put("task.task-notification-threads", "13") @@ -119,7 +125,7 @@ public void testExplicitPropertyMappings() .setSinkMaxBroadcastBufferSize(DataSize.of(128, Unit.MEGABYTE)) .setMaxPagePartitioningBufferSize(DataSize.of(40, Unit.MEGABYTE)) .setWriterCount(4) - .setTaskConcurrency(8) + .setTaskConcurrency(processorCount) .setHttpResponseThreads(4) .setHttpTimeoutThreads(10) .setTaskNotificationThreads(13) diff --git a/pom.xml b/pom.xml index 4380cfedd344..c3808bd35e9e 100644 --- a/pom.xml +++ b/pom.xml @@ -988,6 +988,12 @@ 3.0.3 + + com.github.oshi + oshi-core + 5.8.5 + + com.github.scribejava scribejava-apis @@ -1301,7 +1307,7 @@ net.java.dev.jna jna - 5.8.0 + 5.10.0