diff --git a/client/trino-cli/src/main/java/io/trino/cli/FormatUtils.java b/client/trino-cli/src/main/java/io/trino/cli/FormatUtils.java index 3c70452a37b5..ba032364450a 100644 --- a/client/trino-cli/src/main/java/io/trino/cli/FormatUtils.java +++ b/client/trino-cli/src/main/java/io/trino/cli/FormatUtils.java @@ -196,21 +196,19 @@ public static String formatProgressBar(int width, int tick) repeat(" ", width - (lower + markerWidth)); } - public static String formatProgressBar(int width, int complete, int running, int total) + public static String formatProgressBar(int width, int progressPercentage, int runningPercentage) { - if (total == 0) { - return repeat(" ", width); - } + int totalPercentage = 100; - int pending = max(0, total - complete - running); + int pending = max(0, totalPercentage - progressPercentage - runningPercentage); // compute nominal lengths - int completeLength = min(width, ceil(complete * width, total)); - int pendingLength = min(width, ceil(pending * width, total)); + int completeLength = min(width, ceil(progressPercentage * width, totalPercentage)); + int pendingLength = min(width, ceil(pending * width, totalPercentage)); - // leave space for at least one ">" as long as running is > 0 - int minRunningLength = (running > 0) ? 1 : 0; - int runningLength = max(min(width, ceil(running * width, total)), minRunningLength); + // leave space for at least one ">" as long as runningPercentage is > 0 + int minRunningLength = (runningPercentage > 0) ? 1 : 0; + int runningLength = max(min(width, ceil(runningPercentage * width, totalPercentage)), minRunningLength); // adjust to fix rounding errors if (((completeLength + runningLength + pendingLength) != width) && (pending > 0)) { @@ -218,17 +216,17 @@ public static String formatProgressBar(int width, int complete, int running, int pendingLength = max(0, width - completeLength - runningLength); } if ((completeLength + runningLength + pendingLength) != width) { - // then, sacrifice "running" + // then, sacrifice "runningPercentage" runningLength = max(minRunningLength, width - completeLength - pendingLength); } - if (((completeLength + runningLength + pendingLength) > width) && (complete > 0)) { - // finally, sacrifice "complete" if we're still over the limit + if (((completeLength + runningLength + pendingLength) > width) && (progressPercentage > 0)) { + // finally, sacrifice "progressPercentage" if we're still over the limit completeLength = max(0, width - runningLength - pendingLength); } checkState((completeLength + runningLength + pendingLength) == width, - "Expected completeLength (%s) + runningLength (%s) + pendingLength (%s) == width (%s), was %s for complete = %s, running = %s, total = %s", - completeLength, runningLength, pendingLength, width, completeLength + runningLength + pendingLength, complete, running, total); + "Expected completeLength (%s) + runningLength (%s) + pendingLength (%s) == width (%s), was %s for progressPercentage = %s, runningPercentage = %s, totalPercentage = %s", + completeLength, runningLength, pendingLength, width, completeLength + runningLength + pendingLength, progressPercentage, runningPercentage, totalPercentage); return repeat("=", completeLength) + repeat(">", runningLength) + repeat(" ", pendingLength); } diff --git a/client/trino-cli/src/main/java/io/trino/cli/StatusPrinter.java b/client/trino-cli/src/main/java/io/trino/cli/StatusPrinter.java index 373a64518778..39bb1f0885b6 100644 --- a/client/trino-cli/src/main/java/io/trino/cli/StatusPrinter.java +++ b/client/trino-cli/src/main/java/io/trino/cli/StatusPrinter.java @@ -44,6 +44,7 @@ import static io.trino.cli.TerminalUtils.isRealTerminal; import static io.trino.cli.TerminalUtils.terminalWidth; import static java.lang.Character.toUpperCase; +import static java.lang.Math.ceil; import static java.lang.Math.max; import static java.lang.Math.min; import static java.lang.String.format; @@ -324,10 +325,10 @@ private void printQueryInfo(QueryStatusInfo results, WarningsPrinter warningsPri int progressWidth = (min(terminalWidth, 100) - 75) + 17; // progress bar is 17-42 characters wide if (stats.isScheduled()) { - String progressBar = formatProgressBar(progressWidth, - stats.getCompletedSplits(), - max(0, stats.getRunningSplits()), - stats.getTotalSplits()); + String progressBar = formatProgressBar( + progressWidth, + progressPercentage, + (int) ceil(stats.getRunningPercentage().orElse(0.0))); // 0:17 [ 103MB, 802K rows] [5.74MB/s, 44.9K rows/s] [=====>> ] 10% String progressLine = format("%s [%5s rows, %6s] [%5s rows/s, %8s] [%s] %d%%", diff --git a/client/trino-cli/src/test/java/io/trino/cli/TestQueryRunner.java b/client/trino-cli/src/test/java/io/trino/cli/TestQueryRunner.java index b42942004830..443416a3f3ea 100644 --- a/client/trino-cli/src/test/java/io/trino/cli/TestQueryRunner.java +++ b/client/trino-cli/src/test/java/io/trino/cli/TestQueryRunner.java @@ -33,6 +33,7 @@ import java.time.ZoneId; import java.util.Locale; import java.util.Optional; +import java.util.OptionalDouble; import static com.google.common.io.ByteStreams.nullOutputStream; import static com.google.common.net.HttpHeaders.CONTENT_TYPE; @@ -125,7 +126,11 @@ static String createResults(MockWebServer server) null, ImmutableList.of(new Column("_col0", BIGINT, new ClientTypeSignature(BIGINT))), ImmutableList.of(ImmutableList.of(123)), - StatementStats.builder().setState("FINISHED").build(), + StatementStats.builder() + .setState("FINISHED") + .setProgressPercentage(OptionalDouble.empty()) + .setRunningPercentage(OptionalDouble.empty()) + .build(), //new StatementStats("FINISHED", false, true, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, null), null, ImmutableList.of(), diff --git a/client/trino-client/src/main/java/io/trino/client/StatementStats.java b/client/trino-client/src/main/java/io/trino/client/StatementStats.java index 93e02af9ddf9..fd7e44fac3de 100644 --- a/client/trino-client/src/main/java/io/trino/client/StatementStats.java +++ b/client/trino-client/src/main/java/io/trino/client/StatementStats.java @@ -22,7 +22,6 @@ import java.util.OptionalDouble; import static com.google.common.base.MoreObjects.toStringHelper; -import static java.lang.Math.min; import static java.util.Objects.requireNonNull; @Immutable @@ -31,6 +30,8 @@ public class StatementStats private final String state; private final boolean queued; private final boolean scheduled; + private final OptionalDouble progressPercentage; + private final OptionalDouble runningPercentage; private final int nodes; private final int totalSplits; private final int queuedSplits; @@ -52,6 +53,8 @@ public StatementStats( @JsonProperty("state") String state, @JsonProperty("queued") boolean queued, @JsonProperty("scheduled") boolean scheduled, + @JsonProperty("progressPercentage") OptionalDouble progressPercentage, + @JsonProperty("runningPercentage") OptionalDouble runningPercentage, @JsonProperty("nodes") int nodes, @JsonProperty("totalSplits") int totalSplits, @JsonProperty("queuedSplits") int queuedSplits, @@ -71,6 +74,8 @@ public StatementStats( this.state = requireNonNull(state, "state is null"); this.queued = queued; this.scheduled = scheduled; + this.progressPercentage = requireNonNull(progressPercentage, "progressPercentage is null"); + this.runningPercentage = requireNonNull(runningPercentage, "runningPercentage is null"); this.nodes = nodes; this.totalSplits = totalSplits; this.queuedSplits = queuedSplits; @@ -106,6 +111,18 @@ public boolean isScheduled() return scheduled; } + @JsonProperty + public OptionalDouble getProgressPercentage() + { + return progressPercentage; + } + + @JsonProperty + public OptionalDouble getRunningPercentage() + { + return runningPercentage; + } + @JsonProperty public int getNodes() { @@ -191,15 +208,6 @@ public StageStats getRootStage() return rootStage; } - @JsonProperty - public OptionalDouble getProgressPercentage() - { - if (!scheduled || totalSplits == 0) { - return OptionalDouble.empty(); - } - return OptionalDouble.of(min(100, (completedSplits * 100.0) / totalSplits)); - } - @JsonProperty public long getSpilledBytes() { @@ -213,6 +221,8 @@ public String toString() .add("state", state) .add("queued", queued) .add("scheduled", scheduled) + .add("progressPercentage", progressPercentage) + .add("runningPercentage", runningPercentage) .add("nodes", nodes) .add("totalSplits", totalSplits) .add("queuedSplits", queuedSplits) @@ -241,6 +251,8 @@ public static class Builder private String state; private boolean queued; private boolean scheduled; + private OptionalDouble progressPercentage; + private OptionalDouble runningPercentage; private int nodes; private int totalSplits; private int queuedSplits; @@ -283,6 +295,18 @@ public Builder setScheduled(boolean scheduled) return this; } + public Builder setProgressPercentage(OptionalDouble progressPercentage) + { + this.progressPercentage = progressPercentage; + return this; + } + + public Builder setRunningPercentage(OptionalDouble runningPercentage) + { + this.runningPercentage = runningPercentage; + return this; + } + public Builder setTotalSplits(int totalSplits) { this.totalSplits = totalSplits; @@ -373,6 +397,8 @@ public StatementStats build() state, queued, scheduled, + progressPercentage, + runningPercentage, nodes, totalSplits, queuedSplits, diff --git a/client/trino-jdbc/src/main/java/io/trino/jdbc/QueryStats.java b/client/trino-jdbc/src/main/java/io/trino/jdbc/QueryStats.java index 33efda892018..d09f3297de66 100644 --- a/client/trino-jdbc/src/main/java/io/trino/jdbc/QueryStats.java +++ b/client/trino-jdbc/src/main/java/io/trino/jdbc/QueryStats.java @@ -18,7 +18,6 @@ import java.util.Optional; import java.util.OptionalDouble; -import static java.lang.Math.min; import static java.util.Objects.requireNonNull; public final class QueryStats @@ -27,6 +26,7 @@ public final class QueryStats private final String state; private final boolean queued; private final boolean scheduled; + private final OptionalDouble progressPercentage; private final int nodes; private final int totalSplits; private final int queuedSplits; @@ -46,6 +46,7 @@ public QueryStats( String state, boolean queued, boolean scheduled, + OptionalDouble progressPercentage, int nodes, int totalSplits, int queuedSplits, @@ -64,6 +65,7 @@ public QueryStats( this.state = requireNonNull(state, "state is null"); this.queued = queued; this.scheduled = scheduled; + this.progressPercentage = requireNonNull(progressPercentage, "progressPercentage is null"); this.nodes = nodes; this.totalSplits = totalSplits; this.queuedSplits = queuedSplits; @@ -86,6 +88,7 @@ static QueryStats create(String queryId, StatementStats stats) stats.getState(), stats.isQueued(), stats.isScheduled(), + stats.getProgressPercentage(), stats.getNodes(), stats.getTotalSplits(), stats.getQueuedSplits(), @@ -121,6 +124,11 @@ public boolean isScheduled() return scheduled; } + public OptionalDouble getProgressPercentage() + { + return progressPercentage; + } + public int getNodes() { return nodes; @@ -185,12 +193,4 @@ public Optional getRootStage() { return rootStage; } - - public OptionalDouble getProgressPercentage() - { - if (!scheduled || totalSplits == 0) { - return OptionalDouble.empty(); - } - return OptionalDouble.of(min(100, (completedSplits * 100.0) / totalSplits)); - } } diff --git a/client/trino-jdbc/src/test/java/io/trino/jdbc/TestProgressMonitor.java b/client/trino-jdbc/src/test/java/io/trino/jdbc/TestProgressMonitor.java index 208a17a1bc83..b9c06a450bf7 100644 --- a/client/trino-jdbc/src/test/java/io/trino/jdbc/TestProgressMonitor.java +++ b/client/trino-jdbc/src/test/java/io/trino/jdbc/TestProgressMonitor.java @@ -34,6 +34,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.List; +import java.util.OptionalDouble; import java.util.function.Consumer; import static com.google.common.base.Preconditions.checkState; @@ -91,7 +92,7 @@ private String newQueryResults(Integer partialCancelId, Integer nextUriId, List< nextUriId == null ? null : server.url(format("/v1/statement/%s/%s", queryId, nextUriId)).uri(), responseColumns, data, - new StatementStats(state, state.equals("QUEUED"), true, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, null), + new StatementStats(state, state.equals("QUEUED"), true, OptionalDouble.of(0), OptionalDouble.of(0), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, null), null, ImmutableList.of(), null, diff --git a/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java b/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java index b4df9cfa2184..6ab222b07426 100644 --- a/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java +++ b/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java @@ -35,6 +35,7 @@ import java.net.URI; import java.util.Optional; +import java.util.OptionalDouble; import java.util.concurrent.Executor; import static com.google.common.util.concurrent.Futures.immediateVoidFuture; @@ -288,6 +289,8 @@ private static QueryStats immediateFailureQueryStats() DataSize.ofBytes(0), DataSize.ofBytes(0), false, + OptionalDouble.empty(), + OptionalDouble.empty(), new Duration(0, MILLISECONDS), new Duration(0, MILLISECONDS), new Duration(0, MILLISECONDS), diff --git a/core/trino-main/src/main/java/io/trino/dispatcher/QueuedStatementResource.java b/core/trino-main/src/main/java/io/trino/dispatcher/QueuedStatementResource.java index 861b39aaef2c..d413af170472 100644 --- a/core/trino-main/src/main/java/io/trino/dispatcher/QueuedStatementResource.java +++ b/core/trino-main/src/main/java/io/trino/dispatcher/QueuedStatementResource.java @@ -64,6 +64,7 @@ import java.net.URI; import java.util.Optional; +import java.util.OptionalDouble; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; @@ -282,6 +283,8 @@ private static QueryResults createQueryResults( StatementStats.builder() .setState(state.toString()) .setQueued(state == QUEUED) + .setProgressPercentage(OptionalDouble.empty()) + .setRunningPercentage(OptionalDouble.empty()) .setElapsedTimeMillis(elapsedTime.toMillis()) .setQueuedTimeMillis(queuedTime.toMillis()) .build(), diff --git a/core/trino-main/src/main/java/io/trino/execution/BasicStageStats.java b/core/trino-main/src/main/java/io/trino/execution/BasicStageStats.java index 6d62bdb96be1..af2dbfcd3cc4 100644 --- a/core/trino-main/src/main/java/io/trino/execution/BasicStageStats.java +++ b/core/trino-main/src/main/java/io/trino/execution/BasicStageStats.java @@ -62,6 +62,7 @@ public class BasicStageStats false, ImmutableSet.of(), + OptionalDouble.empty(), OptionalDouble.empty()); private final boolean isScheduled; @@ -88,6 +89,7 @@ public class BasicStageStats private final boolean fullyBlocked; private final Set blockedReasons; private final OptionalDouble progressPercentage; + private final OptionalDouble runningPercentage; public BasicStageStats( boolean isScheduled, @@ -122,7 +124,8 @@ public BasicStageStats( boolean fullyBlocked, Set blockedReasons, - OptionalDouble progressPercentage) + OptionalDouble progressPercentage, + OptionalDouble runningPercentage) { this.isScheduled = isScheduled; this.failedTasks = failedTasks; @@ -148,6 +151,7 @@ public BasicStageStats( this.fullyBlocked = fullyBlocked; this.blockedReasons = ImmutableSet.copyOf(requireNonNull(blockedReasons, "blockedReasons is null")); this.progressPercentage = requireNonNull(progressPercentage, "progressPercentage is null"); + this.runningPercentage = requireNonNull(runningPercentage, "runningPerentage is null"); } public boolean isScheduled() @@ -270,6 +274,11 @@ public OptionalDouble getProgressPercentage() return progressPercentage; } + public OptionalDouble getRunningPercentage() + { + return runningPercentage; + } + public static BasicStageStats aggregateBasicStageStats(Iterable stages) { int failedTasks = 0; @@ -342,6 +351,10 @@ public static BasicStageStats aggregateBasicStageStats(Iterable if (isScheduled && totalDrivers != 0) { progressPercentage = OptionalDouble.of(min(100, (completedDrivers * 100.0) / totalDrivers)); } + OptionalDouble runningPercentage = OptionalDouble.empty(); + if (isScheduled && totalDrivers != 0) { + runningPercentage = OptionalDouble.of(min(100, (runningDrivers * 100.0) / totalDrivers)); + } return new BasicStageStats( isScheduled, @@ -376,6 +389,7 @@ public static BasicStageStats aggregateBasicStageStats(Iterable fullyBlocked, blockedReasons, - progressPercentage); + progressPercentage, + runningPercentage); } } diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryInfo.java b/core/trino-main/src/main/java/io/trino/execution/QueryInfo.java index c5f1a1500307..f186e0836bef 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryInfo.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryInfo.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalDouble; import java.util.Set; import static com.google.common.base.MoreObjects.toStringHelper; @@ -208,6 +209,18 @@ public boolean isScheduled() return queryStats.isScheduled(); } + @JsonProperty + public OptionalDouble getProgressPercentage() + { + return queryStats.getProgressPercentage(); + } + + @JsonProperty + public OptionalDouble getRunningPercentage() + { + return queryStats.getRunningPercentage(); + } + @JsonProperty public URI getSelf() { diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java b/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java index a347b7c67c17..7d910375ccdd 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java @@ -60,12 +60,14 @@ import javax.annotation.concurrent.ThreadSafe; import java.net.URI; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalDouble; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -101,6 +103,7 @@ import static io.trino.util.Ciphers.createRandomAesEncryptionKey; import static io.trino.util.Ciphers.serializeAesEncryptionKey; import static io.trino.util.Failures.toFailure; +import static java.lang.Math.min; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -446,7 +449,8 @@ public BasicQueryInfo getBasicQueryInfo(Optional rootStage) stageStats.isFullyBlocked(), stageStats.getBlockedReasons(), - stageStats.getProgressPercentage()); + stageStats.getProgressPercentage(), + stageStats.getRunningPercentage()); return new BasicQueryInfo( queryId, @@ -665,9 +669,52 @@ private QueryStats getQueryStats(Optional rootStage, List failedOutputPositions += outputStageStats.getFailedOutputPositions(); } - boolean isScheduled = rootStage.isPresent() && allStages.stream() - .map(StageInfo::getState) - .allMatch(state -> state == StageState.RUNNING || state == StageState.PENDING || state.isDone()); + boolean scheduled; + OptionalDouble progressPercentage; + OptionalDouble runningPercentage; + if (getRetryPolicy(session).equals(TASK)) { + // Unlike pipelined execution, fault tolerant execution doesn't execute stages all at + // once and some stages will be in PLANNED state in the middle of execution. + scheduled = rootStage.isPresent() && allStages.stream() + .map(StageInfo::getState) + .anyMatch(StageState::isScheduled); + if (!scheduled || totalDrivers == 0) { + progressPercentage = OptionalDouble.empty(); + runningPercentage = OptionalDouble.empty(); + } + else { + double completedPercentageSum = 0.0; + double runningPercentageSum = 0.0; + int totalStages = 0; + Queue queue = new ArrayDeque<>(); + queue.add(rootStage.get()); + while (!queue.isEmpty()) { + StageInfo stage = queue.poll(); + StageStats stageStats = stage.getStageStats(); + totalStages++; + if (stage.getState().isScheduled()) { + completedPercentageSum += 100.0 * stageStats.getCompletedDrivers() / stageStats.getTotalDrivers(); + runningPercentageSum += 100.0 * stageStats.getRunningDrivers() / stageStats.getTotalDrivers(); + } + queue.addAll(stage.getSubStages()); + } + progressPercentage = OptionalDouble.of(min(100, completedPercentageSum / totalStages)); + runningPercentage = OptionalDouble.of(min(100, runningPercentageSum / totalStages)); + } + } + else { + scheduled = rootStage.isPresent() && allStages.stream() + .map(StageInfo::getState) + .allMatch(StageState::isScheduled); + if (!scheduled || totalDrivers == 0) { + progressPercentage = OptionalDouble.empty(); + runningPercentage = OptionalDouble.empty(); + } + else { + progressPercentage = OptionalDouble.of(min(100, (completedDrivers * 100.0) / totalDrivers)); + runningPercentage = OptionalDouble.of(min(100, (runningDrivers * 100.0) / totalDrivers)); + } + } return new QueryStats( queryStateTimer.getCreateTime(), @@ -707,7 +754,9 @@ private QueryStats getQueryStats(Optional rootStage, List succinctBytes(getPeakTaskRevocableMemory()), succinctBytes(getPeakTaskTotalMemory()), - isScheduled, + scheduled, + progressPercentage, + runningPercentage, new Duration(totalScheduledTime, MILLISECONDS).convertToMostSuccinctTimeUnit(), new Duration(failedScheduledTime, MILLISECONDS).convertToMostSuccinctTimeUnit(), @@ -1290,6 +1339,8 @@ private static QueryStats pruneQueryStats(QueryStats queryStats) queryStats.getPeakTaskRevocableMemory(), queryStats.getPeakTaskTotalMemory(), queryStats.isScheduled(), + queryStats.getProgressPercentage(), + queryStats.getRunningPercentage(), queryStats.getTotalScheduledTime(), queryStats.getFailedScheduledTime(), queryStats.getTotalCpuTime(), diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryStats.java b/core/trino-main/src/main/java/io/trino/execution/QueryStats.java index a4e488a14f1d..68d7b41f5c17 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryStats.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryStats.java @@ -35,7 +35,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static io.airlift.units.DataSize.succinctBytes; import static io.trino.server.DynamicFilterService.DynamicFiltersStats; -import static java.lang.Math.min; import static java.util.Objects.requireNonNull; public class QueryStats @@ -79,6 +78,8 @@ public class QueryStats private final DataSize peakTaskTotalMemory; private final boolean scheduled; + private final OptionalDouble progressPercentage; + private final OptionalDouble runningPercentage; private final Duration totalScheduledTime; private final Duration failedScheduledTime; private final Duration totalCpuTime; @@ -170,6 +171,8 @@ public QueryStats( @JsonProperty("peakTaskTotalMemory") DataSize peakTaskTotalMemory, @JsonProperty("scheduled") boolean scheduled, + @JsonProperty("progressPercentage") OptionalDouble progressPercentage, + @JsonProperty("runningPercentage") OptionalDouble runningPercentage, @JsonProperty("totalScheduledTime") Duration totalScheduledTime, @JsonProperty("failedScheduledTime") Duration failedScheduledTime, @JsonProperty("totalCpuTime") Duration totalCpuTime, @@ -267,6 +270,8 @@ public QueryStats( this.peakTaskRevocableMemory = requireNonNull(peakTaskRevocableMemory, "peakTaskRevocableMemory is null"); this.peakTaskTotalMemory = requireNonNull(peakTaskTotalMemory, "peakTaskTotalMemory is null"); this.scheduled = scheduled; + this.progressPercentage = requireNonNull(progressPercentage, "progressPercentage is null"); + this.runningPercentage = requireNonNull(runningPercentage, "runningPercentage is null"); this.totalScheduledTime = requireNonNull(totalScheduledTime, "totalScheduledTime is null"); this.failedScheduledTime = requireNonNull(failedScheduledTime, "failedScheduledTime is null"); this.totalCpuTime = requireNonNull(totalCpuTime, "totalCpuTime is null"); @@ -528,6 +533,18 @@ public boolean isScheduled() return scheduled; } + @JsonProperty + public OptionalDouble getProgressPercentage() + { + return progressPercentage; + } + + @JsonProperty + public OptionalDouble getRunningPercentage() + { + return runningPercentage; + } + @JsonProperty public Duration getTotalScheduledTime() { @@ -781,15 +798,6 @@ public List getOptimizerRulesSummaries() return optimizerRulesSummaries; } - @JsonProperty - public OptionalDouble getProgressPercentage() - { - if (!scheduled || totalDrivers == 0) { - return OptionalDouble.empty(); - } - return OptionalDouble.of(min(100, (completedDrivers * 100.0) / totalDrivers)); - } - @JsonProperty public DataSize getSpilledDataSize() { diff --git a/core/trino-main/src/main/java/io/trino/execution/StageState.java b/core/trino-main/src/main/java/io/trino/execution/StageState.java index c543dd335cf7..cf88c8f7f49a 100644 --- a/core/trino-main/src/main/java/io/trino/execution/StageState.java +++ b/core/trino-main/src/main/java/io/trino/execution/StageState.java @@ -80,4 +80,12 @@ public boolean isFailure() { return failureState; } + + /** + * Is this a scheduled state + */ + public boolean isScheduled() + { + return this.equals(StageState.RUNNING) || this.equals(StageState.PENDING) || this.isDone(); + } } diff --git a/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java b/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java index 6b5482330fe7..9cd84291110b 100644 --- a/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java +++ b/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java @@ -323,6 +323,10 @@ public BasicStageStats getBasicStageStats(Supplier> taskInfos if (isScheduled && totalDrivers != 0) { progressPercentage = OptionalDouble.of(min(100, (completedDrivers * 100.0) / totalDrivers)); } + OptionalDouble runningPercentage = OptionalDouble.empty(); + if (isScheduled && totalDrivers != 0) { + runningPercentage = OptionalDouble.of(min(100, (runningDrivers * 100.0) / totalDrivers)); + } return new BasicStageStats( isScheduled, @@ -357,7 +361,8 @@ public BasicStageStats getBasicStageStats(Supplier> taskInfos fullyBlocked, blockedReasons, - progressPercentage); + progressPercentage, + runningPercentage); } public StageInfo getStageInfo(Supplier> taskInfosSupplier) diff --git a/core/trino-main/src/main/java/io/trino/execution/StageStats.java b/core/trino-main/src/main/java/io/trino/execution/StageStats.java index 8358f27efc92..d939b08c6940 100644 --- a/core/trino-main/src/main/java/io/trino/execution/StageStats.java +++ b/core/trino-main/src/main/java/io/trino/execution/StageStats.java @@ -634,6 +634,10 @@ public BasicStageStats toBasicStageStats(StageState stageState) if (isScheduled && totalDrivers != 0) { progressPercentage = OptionalDouble.of(min(100, (completedDrivers * 100.0) / totalDrivers)); } + OptionalDouble runningPercentage = OptionalDouble.empty(); + if (isScheduled && totalDrivers != 0) { + progressPercentage = OptionalDouble.of(min(100, (runningDrivers * 100.0) / totalDrivers)); + } return new BasicStageStats( isScheduled, @@ -659,7 +663,8 @@ public BasicStageStats toBasicStageStats(StageState stageState) failedScheduledTime, fullyBlocked, blockedReasons, - progressPercentage); + progressPercentage, + runningPercentage); } public static StageStats createInitial() diff --git a/core/trino-main/src/main/java/io/trino/server/BasicQueryStats.java b/core/trino-main/src/main/java/io/trino/server/BasicQueryStats.java index 41b59e9175ae..3220161e4549 100644 --- a/core/trino-main/src/main/java/io/trino/server/BasicQueryStats.java +++ b/core/trino-main/src/main/java/io/trino/server/BasicQueryStats.java @@ -71,6 +71,7 @@ public class BasicQueryStats private final Set blockedReasons; private final OptionalDouble progressPercentage; + private final OptionalDouble runningPercentage; @JsonCreator public BasicQueryStats( @@ -99,7 +100,8 @@ public BasicQueryStats( @JsonProperty("failedScheduledTime") Duration failedScheduledTime, @JsonProperty("fullyBlocked") boolean fullyBlocked, @JsonProperty("blockedReasons") Set blockedReasons, - @JsonProperty("progressPercentage") OptionalDouble progressPercentage) + @JsonProperty("progressPercentage") OptionalDouble progressPercentage, + @JsonProperty("runningPercentage") OptionalDouble runningPercentage) { this.createTime = createTime; this.endTime = endTime; @@ -139,6 +141,7 @@ public BasicQueryStats( this.blockedReasons = ImmutableSet.copyOf(requireNonNull(blockedReasons, "blockedReasons is null")); this.progressPercentage = requireNonNull(progressPercentage, "progressPercentage is null"); + this.runningPercentage = requireNonNull(runningPercentage, "runningPercentage is null"); } public BasicQueryStats(QueryStats queryStats) @@ -168,7 +171,8 @@ public BasicQueryStats(QueryStats queryStats) queryStats.getFailedScheduledTime(), queryStats.isFullyBlocked(), queryStats.getBlockedReasons(), - queryStats.getProgressPercentage()); + queryStats.getProgressPercentage(), + queryStats.getRunningPercentage()); } public static BasicQueryStats immediateFailureQueryStats() @@ -200,6 +204,7 @@ public static BasicQueryStats immediateFailureQueryStats() new Duration(0, MILLISECONDS), false, ImmutableSet.of(), + OptionalDouble.empty(), OptionalDouble.empty()); } @@ -358,4 +363,10 @@ public OptionalDouble getProgressPercentage() { return progressPercentage; } + + @JsonProperty + public OptionalDouble getRunningPercentage() + { + return runningPercentage; + } } diff --git a/core/trino-main/src/main/java/io/trino/server/protocol/ProtocolUtil.java b/core/trino-main/src/main/java/io/trino/server/protocol/ProtocolUtil.java index a372bd3502c3..75bba65a5cc7 100644 --- a/core/trino-main/src/main/java/io/trino/server/protocol/ProtocolUtil.java +++ b/core/trino-main/src/main/java/io/trino/server/protocol/ProtocolUtil.java @@ -179,6 +179,8 @@ public static StatementStats toStatementStats(QueryInfo queryInfo) .setState(queryInfo.getState().toString()) .setQueued(queryInfo.getState() == QueryState.QUEUED) .setScheduled(queryInfo.isScheduled()) + .setProgressPercentage(queryInfo.getProgressPercentage()) + .setRunningPercentage(queryInfo.getRunningPercentage()) .setNodes(globalUniqueNodes.size()) .setTotalSplits(queryStats.getTotalDrivers()) .setQueuedSplits(queryStats.getQueuedDrivers()) diff --git a/core/trino-main/src/test/java/io/trino/execution/MockManagedQueryExecution.java b/core/trino-main/src/test/java/io/trino/execution/MockManagedQueryExecution.java index 639e7fffe0be..9f6c9a695340 100644 --- a/core/trino-main/src/test/java/io/trino/execution/MockManagedQueryExecution.java +++ b/core/trino-main/src/test/java/io/trino/execution/MockManagedQueryExecution.java @@ -145,6 +145,7 @@ public BasicQueryInfo getBasicQueryInfo() new Duration(23, NANOSECONDS), false, ImmutableSet.of(), + OptionalDouble.empty(), OptionalDouble.empty()), null, null, @@ -202,6 +203,8 @@ public QueryInfo getFullQueryInfo() DataSize.ofBytes(26), !state.isDone(), + state.isDone() ? OptionalDouble.empty() : OptionalDouble.of(8.88), + state.isDone() ? OptionalDouble.empty() : OptionalDouble.of(0), new Duration(20, NANOSECONDS), new Duration(21, NANOSECONDS), new Duration(22, NANOSECONDS), diff --git a/core/trino-main/src/test/java/io/trino/execution/TestQueryInfo.java b/core/trino-main/src/test/java/io/trino/execution/TestQueryInfo.java index 20ccfa17e1ee..7b9394342bee 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestQueryInfo.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestQueryInfo.java @@ -50,6 +50,8 @@ public void testQueryInfoRoundTrip() // Note: SessionRepresentation.equals? assertEquals(actual.getState(), expected.getState()); assertEquals(actual.isScheduled(), expected.isScheduled()); + assertEquals(actual.getProgressPercentage(), expected.getProgressPercentage()); + assertEquals(actual.getRunningPercentage(), expected.getRunningPercentage()); assertEquals(actual.getSelf(), expected.getSelf()); assertEquals(actual.getFieldNames(), expected.getFieldNames()); diff --git a/core/trino-main/src/test/java/io/trino/execution/TestQueryStats.java b/core/trino-main/src/test/java/io/trino/execution/TestQueryStats.java index 25894d2905a1..a35338c54592 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestQueryStats.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestQueryStats.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Optional; +import java.util.OptionalDouble; import static io.airlift.units.DataSize.succinctBytes; import static io.trino.server.DynamicFilterService.DynamicFiltersStats; @@ -213,6 +214,8 @@ public class TestQueryStats DataSize.ofBytes(27), true, + OptionalDouble.of(8.88), + OptionalDouble.of(0), new Duration(28, NANOSECONDS), new Duration(29, NANOSECONDS), new Duration(30, NANOSECONDS), diff --git a/core/trino-main/src/test/java/io/trino/server/TestBasicQueryInfo.java b/core/trino-main/src/test/java/io/trino/server/TestBasicQueryInfo.java index 259989e7483a..80048a7f037c 100644 --- a/core/trino-main/src/test/java/io/trino/server/TestBasicQueryInfo.java +++ b/core/trino-main/src/test/java/io/trino/server/TestBasicQueryInfo.java @@ -90,6 +90,8 @@ public void testConstructor() DataSize.valueOf("30GB"), DataSize.valueOf("31GB"), true, + OptionalDouble.of(100), + OptionalDouble.of(0), new Duration(32, MINUTES), new Duration(33, MINUTES), new Duration(34, MINUTES), @@ -192,6 +194,7 @@ public void testConstructor() assertEquals(basicInfo.getQueryStats().getBlockedReasons(), ImmutableSet.of(BlockedReason.WAITING_FOR_MEMORY)); assertEquals(basicInfo.getQueryStats().getProgressPercentage(), OptionalDouble.of(100)); + assertEquals(basicInfo.getQueryStats().getRunningPercentage(), OptionalDouble.of(0)); assertEquals(basicInfo.getErrorCode(), StandardErrorCode.ABANDONED_QUERY.toErrorCode()); assertEquals(basicInfo.getErrorType(), StandardErrorCode.ABANDONED_QUERY.toErrorCode().getType()); diff --git a/core/trino-main/src/test/java/io/trino/server/TestQueryStateInfo.java b/core/trino-main/src/test/java/io/trino/server/TestQueryStateInfo.java index abbf5893d3f1..a24a148365f3 100644 --- a/core/trino-main/src/test/java/io/trino/server/TestQueryStateInfo.java +++ b/core/trino-main/src/test/java/io/trino/server/TestQueryStateInfo.java @@ -32,6 +32,7 @@ import java.net.URI; import java.util.List; import java.util.Optional; +import java.util.OptionalDouble; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.units.DataSize.Unit.MEGABYTE; @@ -140,6 +141,8 @@ private QueryInfo createQueryInfo(String queryId, QueryState state, String query DataSize.valueOf("28GB"), DataSize.valueOf("29GB"), true, + OptionalDouble.of(8.88), + OptionalDouble.of(0), new Duration(23, MINUTES), new Duration(24, MINUTES), new Duration(25, MINUTES), diff --git a/testing/trino-tests/src/test/java/io/trino/memory/TestClusterMemoryLeakDetector.java b/testing/trino-tests/src/test/java/io/trino/memory/TestClusterMemoryLeakDetector.java index aa8e6fc9169c..6e7ad5525573 100644 --- a/testing/trino-tests/src/test/java/io/trino/memory/TestClusterMemoryLeakDetector.java +++ b/testing/trino-tests/src/test/java/io/trino/memory/TestClusterMemoryLeakDetector.java @@ -104,7 +104,8 @@ private static BasicQueryInfo createQueryInfo(String queryId, QueryState state) new Duration(33, MINUTES), true, ImmutableSet.of(WAITING_FOR_MEMORY), - OptionalDouble.of(20)), + OptionalDouble.of(20), + OptionalDouble.of(0)), null, null, Optional.empty(),