From 3c4ffba9a6feaf098061d96df0f07af1d6ec09d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Wed, 22 May 2024 17:39:27 +0200 Subject: [PATCH 1/4] Fix exposing speculative execution fraction stats --- .../scheduler/faulttolerant/StageExecutionStats.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/StageExecutionStats.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/StageExecutionStats.java index 35acd54381bd..b838a8e04b15 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/StageExecutionStats.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/StageExecutionStats.java @@ -48,12 +48,17 @@ public void recordSourcesFinishedOnStageStart(int sourcesCount) updateSourceOutputEstimationKindCounter("finished", sourcesCount); } - @Managed public void recordStageSpeculativeExecutionFraction(double fractionSpentSpeculative) { speculativeExecutionFractionDistribution.add((long) (fractionSpentSpeculative * EXECUTION_FRACTION_RESCALE_FACTOR)); } + @Managed + public DistributionStat getSpeculativeExecutionFraction() + { + return speculativeExecutionFractionDistribution; + } + private void updateSourceOutputEstimationKindCounter(String outputEstimationKind, int sourcesCount) { getCounterStat(outputEstimationKind).update(sourcesCount); From 8568b35a2661d5d988c5d92269364c798022f4a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Wed, 22 May 2024 17:42:26 +0200 Subject: [PATCH 2/4] Simplify speculative execution fraction calculation --- .../EventDrivenFaultTolerantQueryScheduler.java | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java index 01ae5da6010d..54368ac10709 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java @@ -185,6 +185,7 @@ import static io.trino.sql.planner.TopologicalOrderSubPlanVisitor.sortPlanInTopologicalOrder; import static io.trino.tracing.TrinoAttributes.FAILURE_MESSAGE; import static io.trino.util.Failures.toFailure; +import static java.lang.Math.clamp; import static java.lang.Math.max; import static java.lang.Math.min; import static java.lang.Math.round; @@ -2369,14 +2370,10 @@ private void recordFinishStats() { long finishTime = System.currentTimeMillis(); long nonSpeculativeSwitchTime = this.nonSpeculativeSwitchTime.orElse(finishTime); - - double speculativeExecutionFraction = ((double) nonSpeculativeSwitchTime - (double) startTime) / ((double) finishTime - (double) startTime); - if (Double.isFinite(speculativeExecutionFraction)) { - stageExecutionStats.recordStageSpeculativeExecutionFraction(speculativeExecutionFraction); - } - else { - stageExecutionStats.recordStageSpeculativeExecutionFraction(1.0); - } + stageExecutionStats.recordStageSpeculativeExecutionFraction(clamp( + ((double) nonSpeculativeSwitchTime - startTime) / (finishTime - startTime), + 0.0, + 1.0)); } private void updateOutputSize(SpoolingOutputStats.Snapshot taskOutputStats) From cc2b732ad5f5dda4914a6d5053bdb8f2cc4703ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Wed, 22 May 2024 17:50:09 +0200 Subject: [PATCH 3/4] Use nanoTime for computing speculative execution fraction --- .../EventDrivenFaultTolerantQueryScheduler.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java index 54368ac10709..657f6737c543 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java @@ -1923,7 +1923,7 @@ public static class StageExecution private boolean taskDescriptorLoadingActive; private boolean exchangeClosed; - private final long startTime = System.currentTimeMillis(); + private final long startTime = System.nanoTime(); private OptionalLong nonSpeculativeSwitchTime; private MemoryRequirements initialMemoryRequirements; @@ -1957,7 +1957,7 @@ private StageExecution( this.schedulingPriority = schedulingPriority; this.eager = eager; this.speculative = speculative; - this.nonSpeculativeSwitchTime = speculative ? OptionalLong.empty() : OptionalLong.of(System.currentTimeMillis()); + this.nonSpeculativeSwitchTime = speculative ? OptionalLong.empty() : OptionalLong.of(System.nanoTime()); this.dynamicFilterService = requireNonNull(dynamicFilterService, "dynamicFilterService is null"); outputDataSize = new long[sinkPartitioningScheme.getPartitionCount()]; sinkOutputSelectorBuilder = ExchangeSourceOutputSelector.builder(ImmutableSet.of(exchange.getId())); @@ -2034,7 +2034,7 @@ public void setSpeculative(boolean speculative) { checkArgument(!speculative || this.speculative, "cannot mark non-speculative stage as speculative"); if (this.speculative && !speculative) { - nonSpeculativeSwitchTime = OptionalLong.of(System.currentTimeMillis()); + nonSpeculativeSwitchTime = OptionalLong.of(System.nanoTime()); } this.speculative = speculative; } @@ -2368,7 +2368,7 @@ private void finish() private void recordFinishStats() { - long finishTime = System.currentTimeMillis(); + long finishTime = System.nanoTime(); long nonSpeculativeSwitchTime = this.nonSpeculativeSwitchTime.orElse(finishTime); stageExecutionStats.recordStageSpeculativeExecutionFraction(clamp( ((double) nonSpeculativeSwitchTime - startTime) / (finishTime - startTime), From 88e432c745bf6430f177ad6c84049f9ae008554b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Wed, 22 May 2024 17:52:09 +0200 Subject: [PATCH 4/4] Move field initialization for clarity --- .../EventDrivenFaultTolerantQueryScheduler.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java index 657f6737c543..bf3dbaa786a7 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/EventDrivenFaultTolerantQueryScheduler.java @@ -1923,7 +1923,7 @@ public static class StageExecution private boolean taskDescriptorLoadingActive; private boolean exchangeClosed; - private final long startTime = System.nanoTime(); + private final long startTime; private OptionalLong nonSpeculativeSwitchTime; private MemoryRequirements initialMemoryRequirements; @@ -1957,7 +1957,8 @@ private StageExecution( this.schedulingPriority = schedulingPriority; this.eager = eager; this.speculative = speculative; - this.nonSpeculativeSwitchTime = speculative ? OptionalLong.empty() : OptionalLong.of(System.nanoTime()); + this.startTime = System.nanoTime(); + this.nonSpeculativeSwitchTime = speculative ? OptionalLong.empty() : OptionalLong.of(startTime); this.dynamicFilterService = requireNonNull(dynamicFilterService, "dynamicFilterService is null"); outputDataSize = new long[sinkPartitioningScheme.getPartitionCount()]; sinkOutputSelectorBuilder = ExchangeSourceOutputSelector.builder(ImmutableSet.of(exchange.getId()));