Skip to content

Commit

Permalink
Allocate full node for task on last retry due to lack of memory
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed Aug 19, 2024
1 parent e872728 commit 8edf5b4
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.stream.IntStream;

import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.units.DataSize.Unit.PETABYTE;
import static io.trino.SystemSessionProperties.getFaultTolerantExecutionDefaultCoordinatorTaskMemory;
import static io.trino.SystemSessionProperties.getFaultTolerantExecutionDefaultTaskMemory;
import static io.trino.SystemSessionProperties.getFaultTolerantExecutionTaskMemoryEstimationQuantile;
Expand Down Expand Up @@ -176,9 +177,14 @@ public MemoryRequirements getNextRetryMemoryRequirements(MemoryRequirements prev
// start with the maximum of previously used memory and actual usage
DataSize newMemory = Ordering.natural().max(peakMemoryUsage, previousMemory);
if (shouldIncreaseMemoryRequirement(errorCode)) {
// multiply if we hit an oom error

newMemory = DataSize.of((long) (newMemory.toBytes() * growthFactor), DataSize.Unit.BYTE);
if (remainingAttempts == 1) {
// on last attempt try as much memory as possible
newMemory = DataSize.of(1, PETABYTE);
}
else {
// multiply if we hit an oom error
newMemory = DataSize.of((long) (newMemory.toBytes() * growthFactor), DataSize.Unit.BYTE);
}
}

// if we are still below current estimate for new partition let's bump further
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,32 @@ public void testEstimator()
5))
.isEqualTo(new MemoryRequirements(DataSize.of(150, MEGABYTE)));

assertThat(
estimator.getNextRetryMemoryRequirements(
new MemoryRequirements(DataSize.of(50, MEGABYTE)),
DataSize.of(10, MEGABYTE),
EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode(),
2))
.isEqualTo(new MemoryRequirements(DataSize.of(150, MEGABYTE)));

// on last retry we expect whole node on memory error
assertThat(
estimator.getNextRetryMemoryRequirements(
new MemoryRequirements(DataSize.of(50, MEGABYTE)),
DataSize.of(10, MEGABYTE),
StandardErrorCode.TOO_MANY_REQUESTS_FAILED.toErrorCode(),
1))
.isEqualTo(new MemoryRequirements(DataSize.of(64, GIGABYTE)));

// standard logic even for last retry on non memory related error
assertThat(
estimator.getNextRetryMemoryRequirements(
new MemoryRequirements(DataSize.of(50, MEGABYTE)),
DataSize.of(10, MEGABYTE),
StandardErrorCode.CORRUPT_PAGE.toErrorCode(),
1))
.isEqualTo(new MemoryRequirements(DataSize.of(50, MEGABYTE)));

// peak memory of failed task 70MB
assertThat(
estimator.getNextRetryMemoryRequirements(
Expand Down

0 comments on commit 8edf5b4

Please sign in to comment.