Skip to content

Commit

Permalink
Handle task memory requiremetns which exceed node sizes
Browse files Browse the repository at this point in the history
If task memory requirements exceed memory capacity of any node in the
cluster treat such task as if it would request full node.
  • Loading branch information
losipiuk committed Mar 15, 2022
1 parent 20df89a commit 6e0a160
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkState;
Expand All @@ -65,6 +66,7 @@
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.trino.execution.scheduler.FallbackToFullNodePartitionMemoryEstimator.FULL_NODE_MEMORY;
import static io.trino.spi.StandardErrorCode.NO_NODES_AVAILABLE;
import static java.lang.Math.max;
import static java.lang.Thread.currentThread;
import static java.util.Comparator.comparing;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -110,6 +112,7 @@ public class FullNodeCapableNodeAllocatorService
private final AtomicBoolean stopped = new AtomicBoolean();
private final Semaphore processSemaphore = new Semaphore(0);
private final ConcurrentMap<String, Long> nodePoolSizes = new ConcurrentHashMap<>();
private final AtomicLong maxNodePoolSize = new AtomicLong(FULL_NODE_MEMORY.toBytes());

@Inject
public FullNodeCapableNodeAllocatorService(
Expand Down Expand Up @@ -142,13 +145,20 @@ private void refreshNodePoolSizes()
}
}

long tmpMaxNodePoolSize = 0;
for (Map.Entry<String, Optional<MemoryInfo>> entry : workerMemoryInfo.entrySet()) {
Optional<MemoryInfo> memoryInfo = entry.getValue();
if (memoryInfo.isEmpty()) {
continue;
}
nodePoolSizes.put(entry.getKey(), memoryInfo.get().getPool().getMaxBytes());
long nodePoolSize = memoryInfo.get().getPool().getMaxBytes();
nodePoolSizes.put(entry.getKey(), nodePoolSize);
tmpMaxNodePoolSize = max(tmpMaxNodePoolSize, nodePoolSize);
}
if (tmpMaxNodePoolSize == 0) {
tmpMaxNodePoolSize = FULL_NODE_MEMORY.toBytes();
}
maxNodePoolSize.set(tmpMaxNodePoolSize);
}

private Optional<Long> getNodePoolSize(InternalNode internalNode)
Expand Down Expand Up @@ -209,6 +219,13 @@ private void processSharedPendingAcquires()
iterator.remove();
continue;
}
if (pendingAcquire.getNodeRequirements().getMemory().toBytes() > maxNodePoolSize.get()) {
// nodes in the cluster shrank and what used to be a request for a shared node now is a request for full node
iterator.remove();
detachedFullNodePendingAcquires.add(pendingAcquire);
continue;
}

try {
Candidates candidates = selectCandidates(pendingAcquire.getNodeRequirements(), pendingAcquire.getNodeSelector());
if (candidates.isEmpty()) {
Expand Down Expand Up @@ -659,6 +676,6 @@ public void release()

private boolean isFullNode(NodeRequirements requirements)
{
return requirements.getMemory().compareTo(FULL_NODE_MEMORY) >= 0;
return requirements.getMemory().toBytes() >= maxNodePoolSize.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public class TestFullNodeCapableNodeAllocator
private static final NodeRequirements FULL_NODE_3_REQUIREMENTS = new NodeRequirements(Optional.empty(), Set.of(NODE_3_ADDRESS), FULL_NODE_MEMORY);
private static final NodeRequirements FULL_NODE_CATALOG_1_REQUIREMENTS = new NodeRequirements(Optional.of(CATALOG_1), Set.of(), FULL_NODE_MEMORY);
private static final NodeRequirements FULL_NODE_CATALOG_2_REQUIREMENTS = new NodeRequirements(Optional.of(CATALOG_2), Set.of(), FULL_NODE_MEMORY);
// not using FULL_NODE_MEMORY marker but with memory requirements exceeding any node in cluster
private static final NodeRequirements EFFECTIVELY_FULL_NODE_REQUIREMENTS = new NodeRequirements(Optional.empty(), Set.of(), DataSize.of(65, GIGABYTE));

// none of the tests should require periodic execution of routine which processes pending acquisitions
private static final long TEST_TIMEOUT = FullNodeCapableNodeAllocatorService.PROCESS_PENDING_ACQUIRES_DELAY_SECONDS * 1000 / 2;
Expand Down Expand Up @@ -254,19 +256,32 @@ public void testRemoveAcquiredSharedNode()
@Test(timeOut = TEST_TIMEOUT)
public void testAllocateFullSimple()
throws Exception
{
testAllocateFullSimple(FULL_NODE_REQUIREMENTS);
}

@Test(timeOut = TEST_TIMEOUT)
public void testEffectivelyFullNodeSimple()
throws Exception
{
testAllocateFullSimple(EFFECTIVELY_FULL_NODE_REQUIREMENTS);
}

private void testAllocateFullSimple(NodeRequirements fullNodeRequirements)
throws Exception
{
TestingNodeSupplier nodeSupplier = TestingNodeSupplier.create(basicNodesMap(NODE_1, NODE_2));
setupNodeAllocatorService(nodeSupplier, 3);

try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(Q1_SESSION)) {
// allocate 2 full nodes should not block
NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(FULL_NODE_REQUIREMENTS);
NodeAllocator.NodeLease acquire1 = nodeAllocator.acquire(fullNodeRequirements);
assertAcquired(acquire1);
NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(FULL_NODE_REQUIREMENTS);
NodeAllocator.NodeLease acquire2 = nodeAllocator.acquire(fullNodeRequirements);
assertAcquired(acquire2);

// trying to allocate third full node should block
NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(FULL_NODE_REQUIREMENTS);
NodeAllocator.NodeLease acquire3 = nodeAllocator.acquire(fullNodeRequirements);
assertNotAcquired(acquire3);

// third acquisition should unblock if one of old ones is released
Expand All @@ -288,7 +303,7 @@ public void testAllocateFullSimple()
});

// shared acquisition should block full acquisition
NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(FULL_NODE_REQUIREMENTS);
NodeAllocator.NodeLease acquire5 = nodeAllocator.acquire(fullNodeRequirements);
assertNotAcquired(acquire5);

// and when shared acquisition is gone full node should be acquired
Expand Down

0 comments on commit 6e0a160

Please sign in to comment.