diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java index 7dec9e89bbd1..03d9e39e7791 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java @@ -624,9 +624,7 @@ public MemoryRequirements getInitialMemoryRequirements(Session session, DataSize { DataSize memory = Ordering.natural().max(defaultMemoryLimit, getEstimatedMemoryUsage(session)); memory = capMemoryToMaxNodeSize(memory); - return new MemoryRequirements( - memory, - false); + return new MemoryRequirements(memory); } @Override @@ -646,7 +644,7 @@ public MemoryRequirements getNextRetryMemoryRequirements(Session session, Memory newMemory = Ordering.natural().max(newMemory, getEstimatedMemoryUsage(session)); newMemory = capMemoryToMaxNodeSize(newMemory); - return new MemoryRequirements(newMemory, false); + return new MemoryRequirements(newMemory); } private DataSize capMemoryToMaxNodeSize(DataSize memory) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/ConstantPartitionMemoryEstimator.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/ConstantPartitionMemoryEstimator.java index d65615da50e5..b26dc58c22d1 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/ConstantPartitionMemoryEstimator.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/ConstantPartitionMemoryEstimator.java @@ -25,9 +25,7 @@ public class ConstantPartitionMemoryEstimator @Override public MemoryRequirements getInitialMemoryRequirements(Session session, DataSize defaultMemoryLimit) { - return new MemoryRequirements( - defaultMemoryLimit, - true); + return new MemoryRequirements(defaultMemoryLimit); } @Override diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/FallbackToFullNodePartitionMemoryEstimator.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/FallbackToFullNodePartitionMemoryEstimator.java index 0a2dc40235bf..730ceffe9721 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/FallbackToFullNodePartitionMemoryEstimator.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/FallbackToFullNodePartitionMemoryEstimator.java @@ -28,14 +28,12 @@ public class FallbackToFullNodePartitionMemoryEstimator // temporarily express full-node requirement as huge amount of memory public static final DataSize FULL_NODE_MEMORY = DataSize.of(512, DataSize.Unit.GIGABYTE); - private static final MemoryRequirements FULL_NODE_MEMORY_REQUIREMENTS = new MemoryRequirements(FULL_NODE_MEMORY, true); + private static final MemoryRequirements FULL_NODE_MEMORY_REQUIREMENTS = new MemoryRequirements(FULL_NODE_MEMORY); @Override public MemoryRequirements getInitialMemoryRequirements(Session session, DataSize defaultMemoryLimit) { - return new MemoryRequirements( - defaultMemoryLimit, - false); + return new MemoryRequirements(defaultMemoryLimit); } @Override diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/PartitionMemoryEstimator.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/PartitionMemoryEstimator.java index 821fe87bad94..0fccfb1795f5 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/PartitionMemoryEstimator.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/PartitionMemoryEstimator.java @@ -34,12 +34,10 @@ public interface PartitionMemoryEstimator class MemoryRequirements { private final DataSize requiredMemory; - private final boolean limitReached; - MemoryRequirements(DataSize requiredMemory, boolean limitReached) + MemoryRequirements(DataSize requiredMemory) { this.requiredMemory = requireNonNull(requiredMemory, "requiredMemory is null"); - this.limitReached = limitReached; } public DataSize getRequiredMemory() @@ -47,11 +45,6 @@ public DataSize getRequiredMemory() return requiredMemory; } - public boolean isLimitReached() - { - return limitReached; - } - @Override public boolean equals(Object o) { @@ -62,13 +55,13 @@ public boolean equals(Object o) return false; } MemoryRequirements that = (MemoryRequirements) o; - return limitReached == that.limitReached && Objects.equals(requiredMemory, that.requiredMemory); + return Objects.equals(requiredMemory, that.requiredMemory); } @Override public int hashCode() { - return Objects.hash(requiredMemory, limitReached); + return Objects.hash(requiredMemory); } @Override @@ -76,7 +69,6 @@ public String toString() { return toStringHelper(this) .add("requiredMemory", requiredMemory) - .add("limitReached", limitReached) .toString(); } } diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestExponentialGrowthPartitionMemoryEstimator.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestExponentialGrowthPartitionMemoryEstimator.java index f84a94ededeb..a56bd7c0e6c1 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestExponentialGrowthPartitionMemoryEstimator.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestExponentialGrowthPartitionMemoryEstimator.java @@ -60,106 +60,106 @@ public void testEstimator() Session session = TestingSession.testSessionBuilder().build(); assertThat(estimator.getInitialMemoryRequirements(session, DataSize.of(107, MEGABYTE))) - .isEqualTo(new MemoryRequirements(DataSize.of(107, MEGABYTE), false)); + .isEqualTo(new MemoryRequirements(DataSize.of(107, MEGABYTE))); // peak memory of failed task 10MB assertThat( estimator.getNextRetryMemoryRequirements( session, - new MemoryRequirements(DataSize.of(50, MEGABYTE), false), + new MemoryRequirements(DataSize.of(50, MEGABYTE)), DataSize.of(10, MEGABYTE), StandardErrorCode.CORRUPT_PAGE.toErrorCode())) - .isEqualTo(new MemoryRequirements(DataSize.of(50, MEGABYTE), false)); + .isEqualTo(new MemoryRequirements(DataSize.of(50, MEGABYTE))); assertThat( estimator.getNextRetryMemoryRequirements( session, - new MemoryRequirements(DataSize.of(50, MEGABYTE), false), + new MemoryRequirements(DataSize.of(50, MEGABYTE)), DataSize.of(10, MEGABYTE), StandardErrorCode.CLUSTER_OUT_OF_MEMORY.toErrorCode())) - .isEqualTo(new MemoryRequirements(DataSize.of(150, MEGABYTE), false)); + .isEqualTo(new MemoryRequirements(DataSize.of(150, MEGABYTE))); assertThat( estimator.getNextRetryMemoryRequirements( session, - new MemoryRequirements(DataSize.of(50, MEGABYTE), false), + new MemoryRequirements(DataSize.of(50, MEGABYTE)), DataSize.of(10, MEGABYTE), EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode())) - .isEqualTo(new MemoryRequirements(DataSize.of(150, MEGABYTE), false)); + .isEqualTo(new MemoryRequirements(DataSize.of(150, MEGABYTE))); // peak memory of failed task 70MB assertThat( estimator.getNextRetryMemoryRequirements( session, - new MemoryRequirements(DataSize.of(50, MEGABYTE), false), + new MemoryRequirements(DataSize.of(50, MEGABYTE)), DataSize.of(70, MEGABYTE), StandardErrorCode.CORRUPT_PAGE.toErrorCode())) - .isEqualTo(new MemoryRequirements(DataSize.of(70, MEGABYTE), false)); + .isEqualTo(new MemoryRequirements(DataSize.of(70, MEGABYTE))); assertThat( estimator.getNextRetryMemoryRequirements( session, - new MemoryRequirements(DataSize.of(50, MEGABYTE), false), + new MemoryRequirements(DataSize.of(50, MEGABYTE)), DataSize.of(70, MEGABYTE), EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode())) - .isEqualTo(new MemoryRequirements(DataSize.of(210, MEGABYTE), false)); + .isEqualTo(new MemoryRequirements(DataSize.of(210, MEGABYTE))); // register a couple successful attempts; 90th percentile is at 300MB - estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(1000, MEGABYTE), true, Optional.empty()); - estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(300, MEGABYTE), true, Optional.empty()); - estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(300, MEGABYTE), true, Optional.empty()); - estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(300, MEGABYTE), true, Optional.empty()); - estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(300, MEGABYTE), true, Optional.empty()); - estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(300, MEGABYTE), true, Optional.empty()); - estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(300, MEGABYTE), true, Optional.empty()); - estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(300, MEGABYTE), true, Optional.empty()); - estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(300, MEGABYTE), true, Optional.empty()); - estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(300, MEGABYTE), true, Optional.empty()); - estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(100, MEGABYTE), true, Optional.empty()); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE)), DataSize.of(1000, MEGABYTE), true, Optional.empty()); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE)), DataSize.of(300, MEGABYTE), true, Optional.empty()); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE)), DataSize.of(300, MEGABYTE), true, Optional.empty()); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE)), DataSize.of(300, MEGABYTE), true, Optional.empty()); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE)), DataSize.of(300, MEGABYTE), true, Optional.empty()); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE)), DataSize.of(300, MEGABYTE), true, Optional.empty()); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE)), DataSize.of(300, MEGABYTE), true, Optional.empty()); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE)), DataSize.of(300, MEGABYTE), true, Optional.empty()); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE)), DataSize.of(300, MEGABYTE), true, Optional.empty()); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE)), DataSize.of(300, MEGABYTE), true, Optional.empty()); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE)), DataSize.of(100, MEGABYTE), true, Optional.empty()); // for initial we should pick estimate if greater than default assertThat(estimator.getInitialMemoryRequirements(session, DataSize.of(100, MEGABYTE))) - .isEqualTo(new MemoryRequirements(DataSize.of(300, MEGABYTE), false)); + .isEqualTo(new MemoryRequirements(DataSize.of(300, MEGABYTE))); // if default memory requirements is greater than estimate it should be picked still assertThat(estimator.getInitialMemoryRequirements(session, DataSize.of(500, MEGABYTE))) - .isEqualTo(new MemoryRequirements(DataSize.of(500, MEGABYTE), false)); + .isEqualTo(new MemoryRequirements(DataSize.of(500, MEGABYTE))); // for next we should still pick current initial if greater assertThat( estimator.getNextRetryMemoryRequirements( session, - new MemoryRequirements(DataSize.of(50, MEGABYTE), false), + new MemoryRequirements(DataSize.of(50, MEGABYTE)), DataSize.of(70, MEGABYTE), EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode())) - .isEqualTo(new MemoryRequirements(DataSize.of(300, MEGABYTE), false)); + .isEqualTo(new MemoryRequirements(DataSize.of(300, MEGABYTE))); // a couple oom errors are registered - estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(200, MEGABYTE), false, Optional.of(EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode())); - estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(200, MEGABYTE), true, Optional.of(CLUSTER_OUT_OF_MEMORY.toErrorCode())); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE)), DataSize.of(200, MEGABYTE), false, Optional.of(EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode())); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE)), DataSize.of(200, MEGABYTE), true, Optional.of(CLUSTER_OUT_OF_MEMORY.toErrorCode())); // 90th percentile should be now at 200*3 (600) assertThat(estimator.getInitialMemoryRequirements(session, DataSize.of(100, MEGABYTE))) - .isEqualTo(new MemoryRequirements(DataSize.of(600, MEGABYTE), false)); + .isEqualTo(new MemoryRequirements(DataSize.of(600, MEGABYTE))); // a couple oom errors are registered with requested memory greater than peak - estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(300, MEGABYTE), false), DataSize.of(200, MEGABYTE), false, Optional.of(EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode())); - estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(300, MEGABYTE), false), DataSize.of(200, MEGABYTE), false, Optional.of(EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode())); - estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(300, MEGABYTE), false), DataSize.of(200, MEGABYTE), true, Optional.of(CLUSTER_OUT_OF_MEMORY.toErrorCode())); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(300, MEGABYTE)), DataSize.of(200, MEGABYTE), false, Optional.of(EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode())); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(300, MEGABYTE)), DataSize.of(200, MEGABYTE), false, Optional.of(EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode())); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(300, MEGABYTE)), DataSize.of(200, MEGABYTE), true, Optional.of(CLUSTER_OUT_OF_MEMORY.toErrorCode())); // 90th percentile should be now at 300*3 (900) assertThat(estimator.getInitialMemoryRequirements(session, DataSize.of(100, MEGABYTE))) - .isEqualTo(new MemoryRequirements(DataSize.of(900, MEGABYTE), false)); + .isEqualTo(new MemoryRequirements(DataSize.of(900, MEGABYTE))); // other errors should not change estimate - estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(500, MEGABYTE), false, Optional.of(ADMINISTRATIVELY_PREEMPTED.toErrorCode())); - estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(500, MEGABYTE), false, Optional.of(ADMINISTRATIVELY_PREEMPTED.toErrorCode())); - estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(500, MEGABYTE), false, Optional.of(ADMINISTRATIVELY_PREEMPTED.toErrorCode())); - estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(500, MEGABYTE), false, Optional.of(ADMINISTRATIVELY_PREEMPTED.toErrorCode())); - estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(500, MEGABYTE), false, Optional.of(ADMINISTRATIVELY_PREEMPTED.toErrorCode())); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE)), DataSize.of(500, MEGABYTE), false, Optional.of(ADMINISTRATIVELY_PREEMPTED.toErrorCode())); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE)), DataSize.of(500, MEGABYTE), false, Optional.of(ADMINISTRATIVELY_PREEMPTED.toErrorCode())); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE)), DataSize.of(500, MEGABYTE), false, Optional.of(ADMINISTRATIVELY_PREEMPTED.toErrorCode())); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE)), DataSize.of(500, MEGABYTE), false, Optional.of(ADMINISTRATIVELY_PREEMPTED.toErrorCode())); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE)), DataSize.of(500, MEGABYTE), false, Optional.of(ADMINISTRATIVELY_PREEMPTED.toErrorCode())); assertThat(estimator.getInitialMemoryRequirements(session, DataSize.of(100, MEGABYTE))) - .isEqualTo(new MemoryRequirements(DataSize.of(900, MEGABYTE), false)); + .isEqualTo(new MemoryRequirements(DataSize.of(900, MEGABYTE))); } private MemoryInfo buildWorkerMemoryInfo(DataSize usedMemory)