Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump default fault tolerant task memory #11419

Merged
merged 4 commits into from
Mar 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ public SystemSessionProperties(
dataSizeProperty(
FAULT_TOLERANT_EXECUTION_TASK_MEMORY,
"Estimated amount of memory a single task will use when task level retries are used; value is used allocating nodes for tasks execution",
memoryManagerConfig.getFaultTolerantTaskMemory(),
memoryManagerConfig.getFaultTolerantExecutionTaskMemory(),
false),
booleanProperty(
ADAPTIVE_PARTIAL_AGGREGATION_ENABLED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkState;
Expand All @@ -65,6 +66,7 @@
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.trino.execution.scheduler.FallbackToFullNodePartitionMemoryEstimator.FULL_NODE_MEMORY;
import static io.trino.spi.StandardErrorCode.NO_NODES_AVAILABLE;
import static java.lang.Math.max;
import static java.lang.Thread.currentThread;
import static java.util.Comparator.comparing;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -110,6 +112,7 @@ public class FullNodeCapableNodeAllocatorService
private final AtomicBoolean stopped = new AtomicBoolean();
private final Semaphore processSemaphore = new Semaphore(0);
private final ConcurrentMap<String, Long> nodePoolSizes = new ConcurrentHashMap<>();
private final AtomicLong maxNodePoolSize = new AtomicLong(FULL_NODE_MEMORY.toBytes());

@Inject
public FullNodeCapableNodeAllocatorService(
Expand Down Expand Up @@ -142,13 +145,20 @@ private void refreshNodePoolSizes()
}
}

long tmpMaxNodePoolSize = 0;
for (Map.Entry<String, Optional<MemoryInfo>> entry : workerMemoryInfo.entrySet()) {
Optional<MemoryInfo> memoryInfo = entry.getValue();
if (memoryInfo.isEmpty()) {
continue;
}
nodePoolSizes.put(entry.getKey(), memoryInfo.get().getPool().getMaxBytes());
long nodePoolSize = memoryInfo.get().getPool().getMaxBytes();
nodePoolSizes.put(entry.getKey(), nodePoolSize);
tmpMaxNodePoolSize = max(tmpMaxNodePoolSize, nodePoolSize);
}
if (tmpMaxNodePoolSize == 0) {
tmpMaxNodePoolSize = FULL_NODE_MEMORY.toBytes();
}
maxNodePoolSize.set(tmpMaxNodePoolSize);
}

private Optional<Long> getNodePoolSize(InternalNode internalNode)
Expand Down Expand Up @@ -209,6 +219,13 @@ private void processSharedPendingAcquires()
iterator.remove();
continue;
}
if (pendingAcquire.getNodeRequirements().getMemory().toBytes() > maxNodePoolSize.get()) {
// nodes in the cluster shrank and what used to be a request for a shared node now is a request for full node
iterator.remove();
detachedFullNodePendingAcquires.add(pendingAcquire);
continue;
}

try {
Candidates candidates = selectCandidates(pendingAcquire.getNodeRequirements(), pendingAcquire.getNodeSelector());
if (candidates.isEmpty()) {
Expand Down Expand Up @@ -659,6 +676,6 @@ public void release()

private boolean isFullNode(NodeRequirements requirements)
{
return requirements.getMemory().compareTo(FULL_NODE_MEMORY) >= 0;
return requirements.getMemory().toBytes() >= maxNodePoolSize.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@
"resources.reserved-system-memory"})
public class MemoryManagerConfig
{
public static final String FAULT_TOLERANT_TASK_MEMORY_CONFIG = "fault-tolerant-task-memory";
public static final String FAULT_TOLERANT_TASK_MEMORY_CONFIG = "fault-tolerant-execution-task-memory";

// enforced against user memory allocations
private DataSize maxQueryMemory = DataSize.of(20, GIGABYTE);
// enforced against user + system memory allocations (default is maxQueryMemory * 2)
private DataSize maxQueryTotalMemory;
private DataSize faultTolerantTaskMemory = DataSize.of(1, GIGABYTE);
private DataSize faultTolerantExecutionTaskMemory = DataSize.of(4, GIGABYTE);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally it would be to define it as a fraction of worker heap but we do not have this information on coordinator statically available. Also every worker can be of different size theoretically.

private LowMemoryKillerPolicy lowMemoryKillerPolicy = LowMemoryKillerPolicy.TOTAL_RESERVATION_ON_BLOCKED_NODES;
private Duration killOnOutOfMemoryDelay = new Duration(5, MINUTES);

Expand Down Expand Up @@ -102,16 +102,16 @@ public MemoryManagerConfig setMaxQueryTotalMemory(DataSize maxQueryTotalMemory)
}

@NotNull
public DataSize getFaultTolerantTaskMemory()
public DataSize getFaultTolerantExecutionTaskMemory()
{
return faultTolerantTaskMemory;
return faultTolerantExecutionTaskMemory;
}

@Config(FAULT_TOLERANT_TASK_MEMORY_CONFIG)
@ConfigDescription("Estimated amount of memory a single task will use when task level retries are used; value is used allocating nodes for tasks execution")
public MemoryManagerConfig setFaultTolerantTaskMemory(DataSize faultTolerantTaskMemory)
public MemoryManagerConfig setFaultTolerantExecutionTaskMemory(DataSize faultTolerantExecutionTaskMemory)
{
this.faultTolerantTaskMemory = faultTolerantTaskMemory;
this.faultTolerantExecutionTaskMemory = faultTolerantExecutionTaskMemory;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public class TestFullNodeCapableNodeAllocator
private static final NodeRequirements FULL_NODE_3_REQUIREMENTS = new NodeRequirements(Optional.empty(), Set.of(NODE_3_ADDRESS), FULL_NODE_MEMORY);
private static final NodeRequirements FULL_NODE_CATALOG_1_REQUIREMENTS = new NodeRequirements(Optional.of(CATALOG_1), Set.of(), FULL_NODE_MEMORY);
private static final NodeRequirements FULL_NODE_CATALOG_2_REQUIREMENTS = new NodeRequirements(Optional.of(CATALOG_2), Set.of(), FULL_NODE_MEMORY);
// not using FULL_NODE_MEMORY marker but with memory requirements exceeding any node in cluster
private static final NodeRequirements EFFECTIVELY_FULL_NODE_REQUIREMENTS = new NodeRequirements(Optional.empty(), Set.of(), DataSize.of(65, GIGABYTE));

// none of the tests should require periodic execution of routine which processes pending acquisitions
private static final long TEST_TIMEOUT = FullNodeCapableNodeAllocatorService.PROCESS_PENDING_ACQUIRES_DELAY_SECONDS * 1000 / 2;
Expand Down Expand Up @@ -254,19 +256,32 @@ public void testRemoveAcquiredSharedNode()
@Test(timeOut = TEST_TIMEOUT)
public void testAllocateFullSimple()
throws Exception
{
testAllocateFullSimple(FULL_NODE_REQUIREMENTS);
}

@Test(timeOut = TEST_TIMEOUT)
public void testEffectivelyFullNodeSimple()
throws Exception
{
testAllocateFullSimple(EFFECTIVELY_FULL_NODE_REQUIREMENTS);
}

private void testAllocateFullSimple(NodeRequirements fullNodeRequirements)
throws Exception
{
TestingNodeSupplier nodeSupplier = TestingNodeSupplier.create(basicNodesMap(NODE_1, NODE_2));
setupNodeAllocatorService(nodeSupplier, 3);

try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(Q1_SESSION)) {
// allocate 2 full nodes should not block
NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(FULL_NODE_REQUIREMENTS);
NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(fullNodeRequirements);
assertAcquired(acquire1);
NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(FULL_NODE_REQUIREMENTS);
NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(fullNodeRequirements);
assertAcquired(acquire2);

// trying to allocate third full node should block
NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(FULL_NODE_REQUIREMENTS);
NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(fullNodeRequirements);
assertNotAcquired(acquire3);

// third acquisition should unblock if one of old ones is released
Expand All @@ -288,7 +303,7 @@ public void testAllocateFullSimple()
});

// shared acquisition should block full acquisition
NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(FULL_NODE_REQUIREMENTS);
NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(fullNodeRequirements);
assertNotAcquired(acquire5);

// and when shared acquisition is gone full node should be acquired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void testDefaults()
.setKillOnOutOfMemoryDelay(new Duration(5, MINUTES))
.setMaxQueryMemory(DataSize.of(20, GIGABYTE))
.setMaxQueryTotalMemory(DataSize.of(40, GIGABYTE))
.setFaultTolerantTaskMemory(DataSize.of(1, GIGABYTE)));
.setFaultTolerantExecutionTaskMemory(DataSize.of(4, GIGABYTE)));
}

@Test
Expand All @@ -50,15 +50,15 @@ public void testExplicitPropertyMappings()
.put("query.low-memory-killer.delay", "20s")
.put("query.max-memory", "2GB")
.put("query.max-total-memory", "3GB")
.put("fault-tolerant-task-memory", "2GB")
.put("fault-tolerant-execution-task-memory", "2GB")
.buildOrThrow();

MemoryManagerConfig expected = new MemoryManagerConfig()
.setLowMemoryKillerPolicy(NONE)
.setKillOnOutOfMemoryDelay(new Duration(20, SECONDS))
.setMaxQueryMemory(DataSize.of(2, GIGABYTE))
.setMaxQueryTotalMemory(DataSize.of(3, GIGABYTE))
.setFaultTolerantTaskMemory(DataSize.of(2, GIGABYTE));
.setFaultTolerantExecutionTaskMemory(DataSize.of(2, GIGABYTE));

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ protected final QueryRunner createQueryRunner()
.put("query.hash-partition-count", "5")
// to trigger spilling
.put("exchange.deduplication-buffer-size", "1kB")
.put("fault-tolerant-execution-task-memory", "1GB")
.buildOrThrow(),
ImmutableMap.<String, String>builder()
// making http timeouts shorter so tests which simulate communication timeouts finish in reasonable amount of time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public static Map<String, String> getExtraProperties()
// TODO: re-enable once failure recover supported for this functionality
.put("enable-dynamic-filtering", "false")
.put("distributed-sort", "false")
.put("fault-tolerant-execution-task-memory", "1GB")
.buildOrThrow();
}
}