From 099ea404499e8786d43129207ce77864dd88dd5e Mon Sep 17 00:00:00 2001 From: Andrii Rosa Date: Mon, 31 Jan 2022 13:51:03 -0500 Subject: [PATCH] Implement memory accounting for TaskDescriptor --- .../java/io/trino/execution/Lifespan.java | 8 ++++ .../execution/scheduler/NodeRequirements.java | 12 ++++++ .../scheduler/StageTaskSourceFactory.java | 25 ++++++------ .../execution/scheduler/TaskDescriptor.java | 40 ++++++++++++++----- .../main/java/io/trino/metadata/Split.java | 11 +++++ .../io/trino/sql/planner/plan/PlanNodeId.java | 10 +++++ .../scheduler/TestStageTaskSourceFactory.java | 5 ++- .../scheduler/TestingTaskSourceFactory.java | 7 ++-- 8 files changed, 91 insertions(+), 27 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/Lifespan.java b/core/trino-main/src/main/java/io/trino/execution/Lifespan.java index fbec577b173d..8be2f53d77aa 100644 --- a/core/trino-main/src/main/java/io/trino/execution/Lifespan.java +++ b/core/trino-main/src/main/java/io/trino/execution/Lifespan.java @@ -16,6 +16,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonValue; +import org.openjdk.jol.info.ClassLayout; import java.util.Objects; @@ -25,6 +26,8 @@ public class Lifespan { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(Lifespan.class).instanceSize(); + private static final Lifespan TASK_WIDE = new Lifespan(false, 0); private final boolean grouped; @@ -93,4 +96,9 @@ public int hashCode() { return Objects.hash(grouped, groupId); } + + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE; + } } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeRequirements.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeRequirements.java index 30921d6999ae..5ce6e1e126c4 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeRequirements.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeRequirements.java @@ -16,16 +16,21 @@ import com.google.common.collect.ImmutableSet; import io.trino.connector.CatalogName; import io.trino.spi.HostAddress; +import org.openjdk.jol.info.ClassLayout; import java.util.Objects; import java.util.Optional; import java.util.Set; import static com.google.common.base.MoreObjects.toStringHelper; +import static io.airlift.slice.SizeOf.estimatedSizeOf; +import static io.airlift.slice.SizeOf.sizeOf; import static java.util.Objects.requireNonNull; public class NodeRequirements { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(NodeRequirements.class).instanceSize(); + private final Optional catalogName; private final Set addresses; @@ -78,4 +83,11 @@ public String toString() .add("addresses", addresses) .toString(); } + + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + sizeOf(catalogName, CatalogName::getRetainedSizeInBytes) + + estimatedSizeOf(addresses, HostAddress::getRetainedSizeInBytes); + } } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/StageTaskSourceFactory.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/StageTaskSourceFactory.java index b0c0c492961c..1a175a21d4d8 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/StageTaskSourceFactory.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/StageTaskSourceFactory.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ListMultimap; import com.google.common.collect.Multimap; import com.google.common.util.concurrent.ListenableFuture; import io.airlift.log.Logger; @@ -163,7 +164,7 @@ else if (partitioning.equals(SOURCE_DISTRIBUTION)) { public static class SingleDistributionTaskSource implements TaskSource { - private final Multimap exchangeSourceHandles; + private final ListMultimap exchangeSourceHandles; private boolean finished; @@ -173,9 +174,9 @@ public static SingleDistributionTaskSource create(PlanFragment fragment, Multima return new SingleDistributionTaskSource(getInputsForRemoteSources(fragment.getRemoteSourceNodes(), exchangeSourceHandles)); } - public SingleDistributionTaskSource(Multimap exchangeSourceHandles) + public SingleDistributionTaskSource(ListMultimap exchangeSourceHandles) { - this.exchangeSourceHandles = ImmutableMultimap.copyOf(requireNonNull(exchangeSourceHandles, "exchangeSourceHandles is null")); + this.exchangeSourceHandles = ImmutableListMultimap.copyOf(requireNonNull(exchangeSourceHandles, "exchangeSourceHandles is null")); } @Override @@ -183,7 +184,7 @@ public List getMoreTasks() { List result = ImmutableList.of(new TaskDescriptor( 0, - ImmutableMultimap.of(), + ImmutableListMultimap.of(), exchangeSourceHandles, new NodeRequirements(Optional.empty(), ImmutableSet.of()))); finished = true; @@ -380,7 +381,7 @@ public List getMoreTasks() return ImmutableList.of(); } - Map> partitionToSplitsMap = new HashMap<>(); + Map> partitionToSplitsMap = new HashMap<>(); Map partitionToNodeMap = new HashMap<>(); for (Map.Entry entry : splitSources.entrySet()) { SplitSource splitSource = entry.getValue(); @@ -426,8 +427,8 @@ public List getMoreTasks() int taskPartitionId = 0; ImmutableList.Builder result = ImmutableList.builder(); for (Integer partition : union(partitionToSplitsMap.keySet(), partitionToExchangeSourceHandlesMap.keySet())) { - Multimap splits = partitionToSplitsMap.getOrDefault(partition, ImmutableMultimap.of()); - Multimap exchangeSourceHandles = ImmutableListMultimap.builder() + ListMultimap splits = partitionToSplitsMap.getOrDefault(partition, ImmutableListMultimap.of()); + ListMultimap exchangeSourceHandles = ImmutableListMultimap.builder() .putAll(partitionToExchangeSourceHandlesMap.getOrDefault(partition, ImmutableMultimap.of())) .putAll(replicatedExchangeSourceHandles) .build(); @@ -476,7 +477,7 @@ public static class SourceDistributionTaskSource private final PlanNodeId partitionedSourceNodeId; private final TableExecuteContextManager tableExecuteContextManager; private final SplitSource splitSource; - private final Multimap replicatedExchangeSourceHandles; + private final ListMultimap replicatedExchangeSourceHandles; private final int splitBatchSize; private final LongConsumer getSplitTimeRecorder; private final Optional catalogRequirement; @@ -528,7 +529,7 @@ public SourceDistributionTaskSource( PlanNodeId partitionedSourceNodeId, TableExecuteContextManager tableExecuteContextManager, SplitSource splitSource, - Multimap replicatedExchangeSourceHandles, + ListMultimap replicatedExchangeSourceHandles, int splitBatchSize, LongConsumer getSplitTimeRecorder, Optional catalogRequirement, @@ -674,7 +675,7 @@ public void close() } } - private static Multimap getReplicatedExchangeSourceHandles(PlanFragment fragment, Multimap handles) + private static ListMultimap getReplicatedExchangeSourceHandles(PlanFragment fragment, Multimap handles) { return getInputsForRemoteSources( fragment.getRemoteSourceNodes().stream() @@ -683,7 +684,7 @@ private static Multimap getReplicatedExchangeS handles); } - private static Multimap getPartitionedExchangeSourceHandles(PlanFragment fragment, Multimap handles) + private static ListMultimap getPartitionedExchangeSourceHandles(PlanFragment fragment, Multimap handles) { return getInputsForRemoteSources( fragment.getRemoteSourceNodes().stream() @@ -703,7 +704,7 @@ private static Map getSourceFragmentToRemoteSourceNo return result.build(); } - private static Multimap getInputsForRemoteSources( + private static ListMultimap getInputsForRemoteSources( List remoteSources, Multimap exchangeSourceHandles) { diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/TaskDescriptor.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/TaskDescriptor.java index 6d80fb56b161..02e57a9842e5 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/TaskDescriptor.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/TaskDescriptor.java @@ -13,33 +13,40 @@ */ package io.trino.execution.scheduler; -import com.google.common.collect.ImmutableMultimap; -import com.google.common.collect.Multimap; +import com.google.common.collect.ImmutableListMultimap; +import com.google.common.collect.ListMultimap; import io.trino.metadata.Split; import io.trino.spi.exchange.ExchangeSourceHandle; import io.trino.sql.planner.plan.PlanNodeId; +import org.openjdk.jol.info.ClassLayout; import java.util.Objects; import static com.google.common.base.MoreObjects.toStringHelper; +import static com.google.common.collect.Multimaps.asMap; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static java.util.Objects.requireNonNull; public class TaskDescriptor { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(TaskDescriptor.class).instanceSize(); + private final int partitionId; - private final Multimap splits; - private final Multimap exchangeSourceHandles; + private final ListMultimap splits; + private final ListMultimap exchangeSourceHandles; private final NodeRequirements nodeRequirements; + private transient volatile long retainedSizeInBytes; + public TaskDescriptor( int partitionId, - Multimap splits, - Multimap exchangeSourceHandles, + ListMultimap splits, + ListMultimap exchangeSourceHandles, NodeRequirements nodeRequirements) { this.partitionId = partitionId; - this.splits = ImmutableMultimap.copyOf(requireNonNull(splits, "splits is null")); - this.exchangeSourceHandles = ImmutableMultimap.copyOf(requireNonNull(exchangeSourceHandles, "exchangeSourceHandles is null")); + this.splits = ImmutableListMultimap.copyOf(requireNonNull(splits, "splits is null")); + this.exchangeSourceHandles = ImmutableListMultimap.copyOf(requireNonNull(exchangeSourceHandles, "exchangeSourceHandles is null")); this.nodeRequirements = requireNonNull(nodeRequirements, "nodeRequirements is null"); } @@ -48,12 +55,12 @@ public int getPartitionId() return partitionId; } - public Multimap getSplits() + public ListMultimap getSplits() { return splits; } - public Multimap getExchangeSourceHandles() + public ListMultimap getExchangeSourceHandles() { return exchangeSourceHandles; } @@ -92,4 +99,17 @@ public String toString() .add("nodeRequirements", nodeRequirements) .toString(); } + + public long getRetainedSizeInBytes() + { + long result = retainedSizeInBytes; + if (result == 0) { + result = INSTANCE_SIZE + + estimatedSizeOf(asMap(splits), PlanNodeId::getRetainedSizeInBytes, splits -> estimatedSizeOf(splits, Split::getRetainedSizeInBytes)) + + estimatedSizeOf(asMap(exchangeSourceHandles), PlanNodeId::getRetainedSizeInBytes, handles -> estimatedSizeOf(handles, ExchangeSourceHandle::getRetainedSizeInBytes)) + + nodeRequirements.getRetainedSizeInBytes(); + retainedSizeInBytes = result; + } + return result; + } } diff --git a/core/trino-main/src/main/java/io/trino/metadata/Split.java b/core/trino-main/src/main/java/io/trino/metadata/Split.java index 43695d41d44d..e462afedfec8 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/Split.java +++ b/core/trino-main/src/main/java/io/trino/metadata/Split.java @@ -20,6 +20,7 @@ import io.trino.spi.HostAddress; import io.trino.spi.SplitWeight; import io.trino.spi.connector.ConnectorSplit; +import org.openjdk.jol.info.ClassLayout; import java.util.List; @@ -28,6 +29,8 @@ public final class Split { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(Split.class).instanceSize(); + private final CatalogName catalogName; private final ConnectorSplit connectorSplit; private final Lifespan lifespan; @@ -90,4 +93,12 @@ public String toString() .add("lifespan", lifespan) .toString(); } + + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + catalogName.getRetainedSizeInBytes() + + connectorSplit.getRetainedSizeInBytes() + + lifespan.getRetainedSizeInBytes(); + } } diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/plan/PlanNodeId.java b/core/trino-main/src/main/java/io/trino/sql/planner/plan/PlanNodeId.java index 3cf18b6114c0..429c88f3b1da 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/plan/PlanNodeId.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/plan/PlanNodeId.java @@ -15,14 +15,18 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonValue; +import org.openjdk.jol.info.ClassLayout; import javax.annotation.concurrent.Immutable; +import static io.airlift.slice.SizeOf.estimatedSizeOf; import static java.util.Objects.requireNonNull; @Immutable public class PlanNodeId { + private static final int INSTANCE_SIZE = ClassLayout.parseClass(PlanNodeId.class).instanceSize(); + private final String id; @JsonCreator @@ -58,4 +62,10 @@ public int hashCode() { return id.hashCode(); } + + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + estimatedSizeOf(id); + } } diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestStageTaskSourceFactory.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestStageTaskSourceFactory.java index 1b58f18f2cf2..c663f6491c73 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestStageTaskSourceFactory.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestStageTaskSourceFactory.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableListMultimap; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ListMultimap; import com.google.common.collect.Multimap; import io.airlift.units.DataSize; import io.trino.connector.CatalogName; @@ -73,7 +74,7 @@ public class TestStageTaskSourceFactory @Test public void testSingleDistributionTaskSource() { - Multimap sources = ImmutableListMultimap.builder() + ListMultimap sources = ImmutableListMultimap.builder() .put(PLAN_NODE_1, new TestingExchangeSourceHandle(0, 123)) .put(PLAN_NODE_2, new TestingExchangeSourceHandle(0, 321)) .put(PLAN_NODE_1, new TestingExchangeSourceHandle(0, 222)) @@ -489,7 +490,7 @@ public void testSourceDistributionTaskSource() private static SourceDistributionTaskSource createSourceDistributionTaskSource( List splits, - Multimap replicatedSources, + ListMultimap replicatedSources, int splitBatchSize, int splitsPerTask) { diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestingTaskSourceFactory.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestingTaskSourceFactory.java index bde51232151d..3b8ea14b08f2 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestingTaskSourceFactory.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestingTaskSourceFactory.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableListMultimap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ListMultimap; import com.google.common.collect.Multimap; import io.trino.Session; import io.trino.connector.CatalogName; @@ -76,7 +77,7 @@ public TaskSource create( getHandlesForRemoteSources(fragment.getRemoteSourceNodes(), exchangeSourceHandles)); } - private static Multimap getHandlesForRemoteSources( + private static ListMultimap getHandlesForRemoteSources( List remoteSources, Multimap exchangeSourceHandles) { @@ -99,7 +100,7 @@ public static class TestingTaskSource private final Iterator splits; private final int tasksPerBatch; private final PlanNodeId tableScanPlanNodeId; - private final Multimap exchangeSourceHandles; + private final ListMultimap exchangeSourceHandles; private final AtomicInteger nextPartitionId = new AtomicInteger(); @@ -108,7 +109,7 @@ public TestingTaskSource( List splits, int tasksPerBatch, PlanNodeId tableScanPlanNodeId, - Multimap exchangeSourceHandles) + ListMultimap exchangeSourceHandles) { this.catalogRequirement = requireNonNull(catalogRequirement, "catalogRequirement is null"); this.splits = ImmutableList.copyOf(requireNonNull(splits, "splits is null")).iterator();