Skip to content

Commit

Permalink
Adjust task_concurrency to number of physical cores
Browse files Browse the repository at this point in the history
Logical (hyper threaded) cores do not improve query performance.
Therefore task_concurrency needs to be adjusted to number of physical cores.

AWS r5.8xlarge machines (intel, 16 physical cores with HT, 32 logical cores)
	label 			TPCH wall time		TPC-DS wall time	TPCH CPU time 	TPC-DS CPU time 	TPCH peak mem 	TPC-DS peak mem
0 	concurrency_16_part 	1097.604667 		1346.841167 		119453.0 	156423.427667 		2.165998e+09 	1.282450e+09
1 	concurrency_32_part 	1056.615500 		1370.033500 		129010.1 	164247.540000 		2.164635e+09 	1.310248e+09
2 	concurrency_16_unpart 	904.007167 		2234.841000 		112177.4 	288472.953333 		2.101063e+09 	1.137481e+09
3 	concurrency_32_unpart 	907.333500 		2300.626167 		120640.0 	302876.445500 		2.119834e+09 	1.186475e+09

AWS r5g.8xlarge machines (graviton, 32 physical cores)

	label		 	TPCH wall time 		TPC-DS wall time 	TPCH CPU time 	TPC-DS CPU time		TPCH peak mem 	TPC-DS peak mem
0 	concurrency_16_part 	1063.112833 		1256.248833 		113851.2 	135454.912167 		2.129303e+09 	1.265366e+09
1 	concurrency_32_part 	980.932667 		1258.293667 		113708.1 	136666.440167 		2.144433e+09 	1.276637e+09
2 	concurrency_16_unpart 	811.310000 		1991.245667 		98893.1 	242522.966833 		2.081300e+09 	1.102874e+09
3 	concurrency_32_unpart 	757.619333 		1953.809333 		99628.9 	242261.435167 		2.098105e+09 	1.150644e+09
  • Loading branch information
sopel39 committed Dec 2, 2021
1 parent 93b309e commit a9fa389
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 5 deletions.
5 changes: 5 additions & 0 deletions core/trino-main/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.github.oshi</groupId>
<artifactId>oshi-core</artifactId>
</dependency>

<dependency>
<groupId>com.github.scribejava</groupId>
<artifactId>scribejava-apis</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
import java.math.BigDecimal;
import java.util.concurrent.TimeUnit;

import static io.trino.util.MachineInfo.getAvailablePhysicalProcessorCount;
import static it.unimi.dsi.fastutil.HashCommon.nextPowerOfTwo;
import static java.lang.Math.min;

@DefunctConfig({
"experimental.big-query-max-task-memory",
"task.max-memory",
Expand Down Expand Up @@ -67,7 +71,8 @@ public class TaskManagerConfig
private Duration infoUpdateInterval = new Duration(3, TimeUnit.SECONDS);

private int writerCount = 1;
private int taskConcurrency = 16;
// cap task concurrency to 32 in order to avoid small pages produced by local partitioning exchanges
private int taskConcurrency = min(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 32);
private int httpResponseThreads = 100;
private int httpTimeoutThreads = 3;

Expand Down
55 changes: 55 additions & 0 deletions core/trino-main/src/main/java/io/trino/util/MachineInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.util;

import com.google.common.base.StandardSystemProperty;
import oshi.SystemInfo;

import static java.lang.Math.min;

public final class MachineInfo
{
// cache physical processor count, so that it's not queried multiple times during tests
private static volatile int physicalProcessorCount = -1;

private MachineInfo() {}

public static int getAvailablePhysicalProcessorCount()
{
if (physicalProcessorCount != -1) {
return physicalProcessorCount;
}

String osArch = StandardSystemProperty.OS_ARCH.value();
// logical core count (including container cpu quota if there is any)
int availableProcessorCount = Runtime.getRuntime().availableProcessors();
int totalPhysicalProcessorCount;
if ("amd64".equals(osArch) || "x86_64".equals(osArch)) {
// Oshi can recognize physical processor count (without hyper threading) for x86 platforms.
// However, it doesn't correctly recognize physical processor count for ARM platforms.
totalPhysicalProcessorCount = new SystemInfo()
.getHardware()
.getProcessor()
.getPhysicalProcessorCount();
}
else {
// ARM platforms do not support hyper threading, therefore each logical processor is separate core
totalPhysicalProcessorCount = availableProcessorCount;
}

// cap available processor count to container cpu quota (if there is any).
physicalProcessorCount = min(totalPhysicalProcessorCount, availableProcessorCount);
return physicalProcessorCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,14 @@
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static io.airlift.units.DataSize.Unit;
import static io.trino.util.MachineInfo.getAvailablePhysicalProcessorCount;
import static it.unimi.dsi.fastutil.HashCommon.nextPowerOfTwo;
import static java.lang.Math.min;

public class TestTaskManagerConfig
{
private static final int DEFAULT_PROCESSOR_COUNT = min(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 32);

@Test
public void testDefaults()
{
Expand All @@ -54,7 +59,7 @@ public void testDefaults()
.setSinkMaxBroadcastBufferSize(DataSize.of(200, Unit.MEGABYTE))
.setMaxPagePartitioningBufferSize(DataSize.of(32, Unit.MEGABYTE))
.setWriterCount(1)
.setTaskConcurrency(16)
.setTaskConcurrency(DEFAULT_PROCESSOR_COUNT)
.setHttpResponseThreads(100)
.setHttpTimeoutThreads(3)
.setTaskNotificationThreads(5)
Expand All @@ -66,6 +71,7 @@ public void testDefaults()
@Test
public void testExplicitPropertyMappings()
{
int processorCount = DEFAULT_PROCESSOR_COUNT == 32 ? 16 : 32;
Map<String, String> properties = new ImmutableMap.Builder<String, String>()
.put("task.initial-splits-per-node", "1")
.put("task.split-concurrency-adjustment-interval", "1s")
Expand All @@ -88,7 +94,7 @@ public void testExplicitPropertyMappings()
.put("sink.max-broadcast-buffer-size", "128MB")
.put("driver.max-page-partitioning-buffer-size", "40MB")
.put("task.writer-count", "4")
.put("task.concurrency", "8")
.put("task.concurrency", Integer.toString(processorCount))
.put("task.http-response-threads", "4")
.put("task.http-timeout-threads", "10")
.put("task.task-notification-threads", "13")
Expand Down Expand Up @@ -119,7 +125,7 @@ public void testExplicitPropertyMappings()
.setSinkMaxBroadcastBufferSize(DataSize.of(128, Unit.MEGABYTE))
.setMaxPagePartitioningBufferSize(DataSize.of(40, Unit.MEGABYTE))
.setWriterCount(4)
.setTaskConcurrency(8)
.setTaskConcurrency(processorCount)
.setHttpResponseThreads(4)
.setHttpTimeoutThreads(10)
.setTaskNotificationThreads(13)
Expand Down
8 changes: 7 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,12 @@
<version>3.0.3</version>
</dependency>

<dependency>
<groupId>com.github.oshi</groupId>
<artifactId>oshi-core</artifactId>
<version>5.8.5</version>
</dependency>

<dependency>
<groupId>com.github.scribejava</groupId>
<artifactId>scribejava-apis</artifactId>
Expand Down Expand Up @@ -1301,7 +1307,7 @@
<!-- org.testcontainers:testcontainer's dependencies pull two different versions of this artifact and this is to negotiate the version -->
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<version>5.8.0</version>
<version>5.10.0</version>
</dependency>

<dependency>
Expand Down

0 comments on commit a9fa389

Please sign in to comment.