From 4fa2a16094d39e0d19638a53d1bf72f5a98ef7c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Thu, 10 Mar 2022 18:13:31 +0100 Subject: [PATCH 1/4] Rename config property to match common pattern --- .../main/java/io/trino/SystemSessionProperties.java | 2 +- .../java/io/trino/memory/MemoryManagerConfig.java | 12 ++++++------ .../io/trino/memory/TestMemoryManagerConfig.java | 6 +++--- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java index ab1ed86a057e..3580a9ee44bb 100644 --- a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java +++ b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java @@ -763,7 +763,7 @@ public SystemSessionProperties( dataSizeProperty( FAULT_TOLERANT_EXECUTION_TASK_MEMORY, "Estimated amount of memory a single task will use when task level retries are used; value is used allocating nodes for tasks execution", - memoryManagerConfig.getFaultTolerantTaskMemory(), + memoryManagerConfig.getFaultTolerantExecutionTaskMemory(), false), booleanProperty( ADAPTIVE_PARTIAL_AGGREGATION_ENABLED, diff --git a/core/trino-main/src/main/java/io/trino/memory/MemoryManagerConfig.java b/core/trino-main/src/main/java/io/trino/memory/MemoryManagerConfig.java index 419b4eb76700..27cecfb9d126 100644 --- a/core/trino-main/src/main/java/io/trino/memory/MemoryManagerConfig.java +++ b/core/trino-main/src/main/java/io/trino/memory/MemoryManagerConfig.java @@ -35,13 +35,13 @@ "resources.reserved-system-memory"}) public class MemoryManagerConfig { - public static final String FAULT_TOLERANT_TASK_MEMORY_CONFIG = "fault-tolerant-task-memory"; + public static final String FAULT_TOLERANT_TASK_MEMORY_CONFIG = "fault-tolerant-execution-task-memory"; // enforced against user memory allocations private DataSize maxQueryMemory = DataSize.of(20, GIGABYTE); // enforced against user + system memory allocations (default is maxQueryMemory * 2) private DataSize maxQueryTotalMemory; - private DataSize faultTolerantTaskMemory = DataSize.of(1, GIGABYTE); + private DataSize faultTolerantExecutionTaskMemory = DataSize.of(1, GIGABYTE); private LowMemoryKillerPolicy lowMemoryKillerPolicy = LowMemoryKillerPolicy.TOTAL_RESERVATION_ON_BLOCKED_NODES; private Duration killOnOutOfMemoryDelay = new Duration(5, MINUTES); @@ -102,16 +102,16 @@ public MemoryManagerConfig setMaxQueryTotalMemory(DataSize maxQueryTotalMemory) } @NotNull - public DataSize getFaultTolerantTaskMemory() + public DataSize getFaultTolerantExecutionTaskMemory() { - return faultTolerantTaskMemory; + return faultTolerantExecutionTaskMemory; } @Config(FAULT_TOLERANT_TASK_MEMORY_CONFIG) @ConfigDescription("Estimated amount of memory a single task will use when task level retries are used; value is used allocating nodes for tasks execution") - public MemoryManagerConfig setFaultTolerantTaskMemory(DataSize faultTolerantTaskMemory) + public MemoryManagerConfig setFaultTolerantExecutionTaskMemory(DataSize faultTolerantExecutionTaskMemory) { - this.faultTolerantTaskMemory = faultTolerantTaskMemory; + this.faultTolerantExecutionTaskMemory = faultTolerantExecutionTaskMemory; return this; } diff --git a/core/trino-main/src/test/java/io/trino/memory/TestMemoryManagerConfig.java b/core/trino-main/src/test/java/io/trino/memory/TestMemoryManagerConfig.java index 6ca727e05930..8c0e47021292 100644 --- a/core/trino-main/src/test/java/io/trino/memory/TestMemoryManagerConfig.java +++ b/core/trino-main/src/test/java/io/trino/memory/TestMemoryManagerConfig.java @@ -39,7 +39,7 @@ public void testDefaults() .setKillOnOutOfMemoryDelay(new Duration(5, MINUTES)) .setMaxQueryMemory(DataSize.of(20, GIGABYTE)) .setMaxQueryTotalMemory(DataSize.of(40, GIGABYTE)) - .setFaultTolerantTaskMemory(DataSize.of(1, GIGABYTE))); + .setFaultTolerantExecutionTaskMemory(DataSize.of(1, GIGABYTE))); } @Test @@ -50,7 +50,7 @@ public void testExplicitPropertyMappings() .put("query.low-memory-killer.delay", "20s") .put("query.max-memory", "2GB") .put("query.max-total-memory", "3GB") - .put("fault-tolerant-task-memory", "2GB") + .put("fault-tolerant-execution-task-memory", "2GB") .buildOrThrow(); MemoryManagerConfig expected = new MemoryManagerConfig() @@ -58,7 +58,7 @@ public void testExplicitPropertyMappings() .setKillOnOutOfMemoryDelay(new Duration(20, SECONDS)) .setMaxQueryMemory(DataSize.of(2, GIGABYTE)) .setMaxQueryTotalMemory(DataSize.of(3, GIGABYTE)) - .setFaultTolerantTaskMemory(DataSize.of(2, GIGABYTE)); + .setFaultTolerantExecutionTaskMemory(DataSize.of(2, GIGABYTE)); assertFullMapping(properties, expected); } From e99d42b5f5ea6e1f09915da1814bffec9e58f66e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Thu, 10 Mar 2022 23:40:14 +0100 Subject: [PATCH 2/4] Handle task memory requiremetns which exceed node sizes If task memory requirements exceed memory capacity of any node in the cluster treat such task as if it would request full node. --- .../FullNodeCapableNodeAllocatorService.java | 21 +++++++++++++++-- .../TestFullNodeCapableNodeAllocator.java | 23 +++++++++++++++---- 2 files changed, 38 insertions(+), 6 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/FullNodeCapableNodeAllocatorService.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/FullNodeCapableNodeAllocatorService.java index bf9d14c0b478..8fa2d08528e5 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/FullNodeCapableNodeAllocatorService.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/FullNodeCapableNodeAllocatorService.java @@ -53,6 +53,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import static com.google.common.base.Preconditions.checkState; @@ -65,6 +66,7 @@ import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.trino.execution.scheduler.FallbackToFullNodePartitionMemoryEstimator.FULL_NODE_MEMORY; import static io.trino.spi.StandardErrorCode.NO_NODES_AVAILABLE; +import static java.lang.Math.max; import static java.lang.Thread.currentThread; import static java.util.Comparator.comparing; import static java.util.Objects.requireNonNull; @@ -110,6 +112,7 @@ public class FullNodeCapableNodeAllocatorService private final AtomicBoolean stopped = new AtomicBoolean(); private final Semaphore processSemaphore = new Semaphore(0); private final ConcurrentMap nodePoolSizes = new ConcurrentHashMap<>(); + private final AtomicLong maxNodePoolSize = new AtomicLong(FULL_NODE_MEMORY.toBytes()); @Inject public FullNodeCapableNodeAllocatorService( @@ -142,13 +145,20 @@ private void refreshNodePoolSizes() } } + long tmpMaxNodePoolSize = 0; for (Map.Entry> entry : workerMemoryInfo.entrySet()) { Optional memoryInfo = entry.getValue(); if (memoryInfo.isEmpty()) { continue; } - nodePoolSizes.put(entry.getKey(), memoryInfo.get().getPool().getMaxBytes()); + long nodePoolSize = memoryInfo.get().getPool().getMaxBytes(); + nodePoolSizes.put(entry.getKey(), nodePoolSize); + tmpMaxNodePoolSize = max(tmpMaxNodePoolSize, nodePoolSize); } + if (tmpMaxNodePoolSize == 0) { + tmpMaxNodePoolSize = FULL_NODE_MEMORY.toBytes(); + } + maxNodePoolSize.set(tmpMaxNodePoolSize); } private Optional getNodePoolSize(InternalNode internalNode) @@ -209,6 +219,13 @@ private void processSharedPendingAcquires() iterator.remove(); continue; } + if (pendingAcquire.getNodeRequirements().getMemory().toBytes() > maxNodePoolSize.get()) { + // nodes in the cluster shrank and what used to be a request for a shared node now is a request for full node + iterator.remove(); + detachedFullNodePendingAcquires.add(pendingAcquire); + continue; + } + try { Candidates candidates = selectCandidates(pendingAcquire.getNodeRequirements(), pendingAcquire.getNodeSelector()); if (candidates.isEmpty()) { @@ -659,6 +676,6 @@ public void release() private boolean isFullNode(NodeRequirements requirements) { - return requirements.getMemory().compareTo(FULL_NODE_MEMORY) >= 0; + return requirements.getMemory().toBytes() >= maxNodePoolSize.get(); } } diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestFullNodeCapableNodeAllocator.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestFullNodeCapableNodeAllocator.java index f0883beaad4c..905183517e97 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestFullNodeCapableNodeAllocator.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestFullNodeCapableNodeAllocator.java @@ -77,6 +77,8 @@ public class TestFullNodeCapableNodeAllocator private static final NodeRequirements FULL_NODE_3_REQUIREMENTS = new NodeRequirements(Optional.empty(), Set.of(NODE_3_ADDRESS), FULL_NODE_MEMORY); private static final NodeRequirements FULL_NODE_CATALOG_1_REQUIREMENTS = new NodeRequirements(Optional.of(CATALOG_1), Set.of(), FULL_NODE_MEMORY); private static final NodeRequirements FULL_NODE_CATALOG_2_REQUIREMENTS = new NodeRequirements(Optional.of(CATALOG_2), Set.of(), FULL_NODE_MEMORY); + // not using FULL_NODE_MEMORY marker but with memory requirements exceeding any node in cluster + private static final NodeRequirements EFFECTIVELY_FULL_NODE_REQUIREMENTS = new NodeRequirements(Optional.empty(), Set.of(), DataSize.of(65, GIGABYTE)); // none of the tests should require periodic execution of routine which processes pending acquisitions private static final long TEST_TIMEOUT = FullNodeCapableNodeAllocatorService.PROCESS_PENDING_ACQUIRES_DELAY_SECONDS * 1000 / 2; @@ -254,19 +256,32 @@ public void testRemoveAcquiredSharedNode() @Test(timeOut = TEST_TIMEOUT) public void testAllocateFullSimple() throws Exception + { + testAllocateFullSimple(FULL_NODE_REQUIREMENTS); + } + + @Test(timeOut = TEST_TIMEOUT) + public void testEffectivelyFullNodeSimple() + throws Exception + { + testAllocateFullSimple(EFFECTIVELY_FULL_NODE_REQUIREMENTS); + } + + private void testAllocateFullSimple(NodeRequirements fullNodeRequirements) + throws Exception { TestingNodeSupplier nodeSupplier = TestingNodeSupplier.create(basicNodesMap(NODE_1, NODE_2)); setupNodeAllocatorService(nodeSupplier, 3); try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(Q1_SESSION)) { // allocate 2 full nodes should not block - NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(FULL_NODE_REQUIREMENTS); + NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(fullNodeRequirements); assertAcquired(acquire1); - NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(FULL_NODE_REQUIREMENTS); + NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(fullNodeRequirements); assertAcquired(acquire2); // trying to allocate third full node should block - NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(FULL_NODE_REQUIREMENTS); + NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(fullNodeRequirements); assertNotAcquired(acquire3); // third acquisition should unblock if one of old ones is released @@ -288,7 +303,7 @@ public void testAllocateFullSimple() }); // shared acquisition should block full acquisition - NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(FULL_NODE_REQUIREMENTS); + NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(fullNodeRequirements); assertNotAcquired(acquire5); // and when shared acquisition is gone full node should be acquired From 1d38960cbd3add0b08a1d9f041a6e427f5e6139e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Thu, 10 Mar 2022 18:17:21 +0100 Subject: [PATCH 3/4] Bump default fault tolerant task memory --- .../src/main/java/io/trino/memory/MemoryManagerConfig.java | 2 +- .../src/test/java/io/trino/memory/TestMemoryManagerConfig.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/memory/MemoryManagerConfig.java b/core/trino-main/src/main/java/io/trino/memory/MemoryManagerConfig.java index 27cecfb9d126..0e0d2f51cda8 100644 --- a/core/trino-main/src/main/java/io/trino/memory/MemoryManagerConfig.java +++ b/core/trino-main/src/main/java/io/trino/memory/MemoryManagerConfig.java @@ -41,7 +41,7 @@ public class MemoryManagerConfig private DataSize maxQueryMemory = DataSize.of(20, GIGABYTE); // enforced against user + system memory allocations (default is maxQueryMemory * 2) private DataSize maxQueryTotalMemory; - private DataSize faultTolerantExecutionTaskMemory = DataSize.of(1, GIGABYTE); + private DataSize faultTolerantExecutionTaskMemory = DataSize.of(4, GIGABYTE); private LowMemoryKillerPolicy lowMemoryKillerPolicy = LowMemoryKillerPolicy.TOTAL_RESERVATION_ON_BLOCKED_NODES; private Duration killOnOutOfMemoryDelay = new Duration(5, MINUTES); diff --git a/core/trino-main/src/test/java/io/trino/memory/TestMemoryManagerConfig.java b/core/trino-main/src/test/java/io/trino/memory/TestMemoryManagerConfig.java index 8c0e47021292..68dc2cb83bf6 100644 --- a/core/trino-main/src/test/java/io/trino/memory/TestMemoryManagerConfig.java +++ b/core/trino-main/src/test/java/io/trino/memory/TestMemoryManagerConfig.java @@ -39,7 +39,7 @@ public void testDefaults() .setKillOnOutOfMemoryDelay(new Duration(5, MINUTES)) .setMaxQueryMemory(DataSize.of(20, GIGABYTE)) .setMaxQueryTotalMemory(DataSize.of(40, GIGABYTE)) - .setFaultTolerantExecutionTaskMemory(DataSize.of(1, GIGABYTE))); + .setFaultTolerantExecutionTaskMemory(DataSize.of(4, GIGABYTE))); } @Test From b7f76a45d3fea1ac4943158dc83ef45b5b827e28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Thu, 10 Mar 2022 23:45:52 +0100 Subject: [PATCH 4/4] Request 1GB task memory in failure recovery tests Default value (4GB) does not match test environment well and would effectively require full node for each task. --- .../src/main/java/io/trino/testing/BaseFailureRecoveryTest.java | 1 + .../trino/testing/FaultTolerantExecutionConnectorTestHelper.java | 1 + 2 files changed, 2 insertions(+) diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseFailureRecoveryTest.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseFailureRecoveryTest.java index b8d1ee7128a8..a960fc16f62a 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/BaseFailureRecoveryTest.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseFailureRecoveryTest.java @@ -110,6 +110,7 @@ protected final QueryRunner createQueryRunner() .put("query.hash-partition-count", "5") // to trigger spilling .put("exchange.deduplication-buffer-size", "1kB") + .put("fault-tolerant-execution-task-memory", "1GB") .buildOrThrow(), ImmutableMap.builder() // making http timeouts shorter so tests which simulate communication timeouts finish in reasonable amount of time diff --git a/testing/trino-testing/src/main/java/io/trino/testing/FaultTolerantExecutionConnectorTestHelper.java b/testing/trino-testing/src/main/java/io/trino/testing/FaultTolerantExecutionConnectorTestHelper.java index efb7ee80848f..a0052881596c 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/FaultTolerantExecutionConnectorTestHelper.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/FaultTolerantExecutionConnectorTestHelper.java @@ -33,6 +33,7 @@ public static Map getExtraProperties() // TODO: re-enable once failure recover supported for this functionality .put("enable-dynamic-filtering", "false") .put("distributed-sort", "false") + .put("fault-tolerant-execution-task-memory", "1GB") .buildOrThrow(); } }