Skip to content

Commit

Permalink
Add progress bar support for fault tolerant execution
Browse files Browse the repository at this point in the history
  • Loading branch information
linzebing committed Apr 4, 2023
1 parent f61f5e5 commit 28150ed
Show file tree
Hide file tree
Showing 23 changed files with 231 additions and 62 deletions.
28 changes: 13 additions & 15 deletions client/trino-cli/src/main/java/io/trino/cli/FormatUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,39 +196,37 @@ public static String formatProgressBar(int width, int tick)
repeat(" ", width - (lower + markerWidth));
}

public static String formatProgressBar(int width, int complete, int running, int total)
public static String formatProgressBar(int width, int progressPercentage, int runningPercentage)
{
if (total == 0) {
return repeat(" ", width);
}
int totalPercentage = 100;

int pending = max(0, total - complete - running);
int pending = max(0, totalPercentage - progressPercentage - runningPercentage);

// compute nominal lengths
int completeLength = min(width, ceil(complete * width, total));
int pendingLength = min(width, ceil(pending * width, total));
int completeLength = min(width, ceil(progressPercentage * width, totalPercentage));
int pendingLength = min(width, ceil(pending * width, totalPercentage));

// leave space for at least one ">" as long as running is > 0
int minRunningLength = (running > 0) ? 1 : 0;
int runningLength = max(min(width, ceil(running * width, total)), minRunningLength);
// leave space for at least one ">" as long as runningPercentage is > 0
int minRunningLength = (runningPercentage > 0) ? 1 : 0;
int runningLength = max(min(width, ceil(runningPercentage * width, totalPercentage)), minRunningLength);

// adjust to fix rounding errors
if (((completeLength + runningLength + pendingLength) != width) && (pending > 0)) {
// sacrifice "pending" if we're over the max width
pendingLength = max(0, width - completeLength - runningLength);
}
if ((completeLength + runningLength + pendingLength) != width) {
// then, sacrifice "running"
// then, sacrifice "runningPercentage"
runningLength = max(minRunningLength, width - completeLength - pendingLength);
}
if (((completeLength + runningLength + pendingLength) > width) && (complete > 0)) {
// finally, sacrifice "complete" if we're still over the limit
if (((completeLength + runningLength + pendingLength) > width) && (progressPercentage > 0)) {
// finally, sacrifice "progressPercentage" if we're still over the limit
completeLength = max(0, width - runningLength - pendingLength);
}

checkState((completeLength + runningLength + pendingLength) == width,
"Expected completeLength (%s) + runningLength (%s) + pendingLength (%s) == width (%s), was %s for complete = %s, running = %s, total = %s",
completeLength, runningLength, pendingLength, width, completeLength + runningLength + pendingLength, complete, running, total);
"Expected completeLength (%s) + runningLength (%s) + pendingLength (%s) == width (%s), was %s for progressPercentage = %s, runningPercentage = %s, totalPercentage = %s",
completeLength, runningLength, pendingLength, width, completeLength + runningLength + pendingLength, progressPercentage, runningPercentage, totalPercentage);

return repeat("=", completeLength) + repeat(">", runningLength) + repeat(" ", pendingLength);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static io.trino.cli.TerminalUtils.isRealTerminal;
import static io.trino.cli.TerminalUtils.terminalWidth;
import static java.lang.Character.toUpperCase;
import static java.lang.Math.ceil;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.lang.String.format;
Expand Down Expand Up @@ -324,10 +325,10 @@ private void printQueryInfo(QueryStatusInfo results, WarningsPrinter warningsPri
int progressWidth = (min(terminalWidth, 100) - 75) + 17; // progress bar is 17-42 characters wide

if (stats.isScheduled()) {
String progressBar = formatProgressBar(progressWidth,
stats.getCompletedSplits(),
max(0, stats.getRunningSplits()),
stats.getTotalSplits());
String progressBar = formatProgressBar(
progressWidth,
progressPercentage,
(int) ceil(stats.getRunningPercentage().orElse(0.0)));

// 0:17 [ 103MB, 802K rows] [5.74MB/s, 44.9K rows/s] [=====>> ] 10%
String progressLine = format("%s [%5s rows, %6s] [%5s rows/s, %8s] [%s] %d%%",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.time.ZoneId;
import java.util.Locale;
import java.util.Optional;
import java.util.OptionalDouble;

import static com.google.common.io.ByteStreams.nullOutputStream;
import static com.google.common.net.HttpHeaders.CONTENT_TYPE;
Expand Down Expand Up @@ -125,7 +126,11 @@ static String createResults(MockWebServer server)
null,
ImmutableList.of(new Column("_col0", BIGINT, new ClientTypeSignature(BIGINT))),
ImmutableList.of(ImmutableList.of(123)),
StatementStats.builder().setState("FINISHED").build(),
StatementStats.builder()
.setState("FINISHED")
.setProgressPercentage(OptionalDouble.empty())
.setRunningPercentage(OptionalDouble.empty())
.build(),
//new StatementStats("FINISHED", false, true, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, null),
null,
ImmutableList.of(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.OptionalDouble;

import static com.google.common.base.MoreObjects.toStringHelper;
import static java.lang.Math.min;
import static java.util.Objects.requireNonNull;

@Immutable
Expand All @@ -31,6 +30,8 @@ public class StatementStats
private final String state;
private final boolean queued;
private final boolean scheduled;
private final OptionalDouble progressPercentage;
private final OptionalDouble runningPercentage;
private final int nodes;
private final int totalSplits;
private final int queuedSplits;
Expand All @@ -52,6 +53,8 @@ public StatementStats(
@JsonProperty("state") String state,
@JsonProperty("queued") boolean queued,
@JsonProperty("scheduled") boolean scheduled,
@JsonProperty("progressPercentage") OptionalDouble progressPercentage,
@JsonProperty("runningPercentage") OptionalDouble runningPercentage,
@JsonProperty("nodes") int nodes,
@JsonProperty("totalSplits") int totalSplits,
@JsonProperty("queuedSplits") int queuedSplits,
Expand All @@ -71,6 +74,8 @@ public StatementStats(
this.state = requireNonNull(state, "state is null");
this.queued = queued;
this.scheduled = scheduled;
this.progressPercentage = requireNonNull(progressPercentage, "progressPercentage is null");
this.runningPercentage = requireNonNull(runningPercentage, "runningPercentage is null");
this.nodes = nodes;
this.totalSplits = totalSplits;
this.queuedSplits = queuedSplits;
Expand Down Expand Up @@ -106,6 +111,18 @@ public boolean isScheduled()
return scheduled;
}

@JsonProperty
public OptionalDouble getProgressPercentage()
{
return progressPercentage;
}

@JsonProperty
public OptionalDouble getRunningPercentage()
{
return runningPercentage;
}

@JsonProperty
public int getNodes()
{
Expand Down Expand Up @@ -191,15 +208,6 @@ public StageStats getRootStage()
return rootStage;
}

@JsonProperty
public OptionalDouble getProgressPercentage()
{
if (!scheduled || totalSplits == 0) {
return OptionalDouble.empty();
}
return OptionalDouble.of(min(100, (completedSplits * 100.0) / totalSplits));
}

@JsonProperty
public long getSpilledBytes()
{
Expand All @@ -213,6 +221,8 @@ public String toString()
.add("state", state)
.add("queued", queued)
.add("scheduled", scheduled)
.add("progressPercentage", progressPercentage)
.add("runningPercentage", runningPercentage)
.add("nodes", nodes)
.add("totalSplits", totalSplits)
.add("queuedSplits", queuedSplits)
Expand Down Expand Up @@ -241,6 +251,8 @@ public static class Builder
private String state;
private boolean queued;
private boolean scheduled;
private OptionalDouble progressPercentage;
private OptionalDouble runningPercentage;
private int nodes;
private int totalSplits;
private int queuedSplits;
Expand Down Expand Up @@ -283,6 +295,18 @@ public Builder setScheduled(boolean scheduled)
return this;
}

public Builder setProgressPercentage(OptionalDouble progressPercentage)
{
this.progressPercentage = progressPercentage;
return this;
}

public Builder setRunningPercentage(OptionalDouble runningPercentage)
{
this.runningPercentage = runningPercentage;
return this;
}

public Builder setTotalSplits(int totalSplits)
{
this.totalSplits = totalSplits;
Expand Down Expand Up @@ -373,6 +397,8 @@ public StatementStats build()
state,
queued,
scheduled,
progressPercentage,
runningPercentage,
nodes,
totalSplits,
queuedSplits,
Expand Down
18 changes: 9 additions & 9 deletions client/trino-jdbc/src/main/java/io/trino/jdbc/QueryStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.Optional;
import java.util.OptionalDouble;

import static java.lang.Math.min;
import static java.util.Objects.requireNonNull;

public final class QueryStats
Expand All @@ -27,6 +26,7 @@ public final class QueryStats
private final String state;
private final boolean queued;
private final boolean scheduled;
private final OptionalDouble progressPercentage;
private final int nodes;
private final int totalSplits;
private final int queuedSplits;
Expand All @@ -46,6 +46,7 @@ public QueryStats(
String state,
boolean queued,
boolean scheduled,
OptionalDouble progressPercentage,
int nodes,
int totalSplits,
int queuedSplits,
Expand All @@ -64,6 +65,7 @@ public QueryStats(
this.state = requireNonNull(state, "state is null");
this.queued = queued;
this.scheduled = scheduled;
this.progressPercentage = requireNonNull(progressPercentage, "progressPercentage is null");
this.nodes = nodes;
this.totalSplits = totalSplits;
this.queuedSplits = queuedSplits;
Expand All @@ -86,6 +88,7 @@ static QueryStats create(String queryId, StatementStats stats)
stats.getState(),
stats.isQueued(),
stats.isScheduled(),
stats.getProgressPercentage(),
stats.getNodes(),
stats.getTotalSplits(),
stats.getQueuedSplits(),
Expand Down Expand Up @@ -121,6 +124,11 @@ public boolean isScheduled()
return scheduled;
}

public OptionalDouble getProgressPercentage()
{
return progressPercentage;
}

public int getNodes()
{
return nodes;
Expand Down Expand Up @@ -185,12 +193,4 @@ public Optional<StageStats> getRootStage()
{
return rootStage;
}

public OptionalDouble getProgressPercentage()
{
if (!scheduled || totalSplits == 0) {
return OptionalDouble.empty();
}
return OptionalDouble.of(min(100, (completedSplits * 100.0) / totalSplits));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.OptionalDouble;
import java.util.function.Consumer;

import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -91,7 +92,7 @@ private String newQueryResults(Integer partialCancelId, Integer nextUriId, List<
nextUriId == null ? null : server.url(format("/v1/statement/%s/%s", queryId, nextUriId)).uri(),
responseColumns,
data,
new StatementStats(state, state.equals("QUEUED"), true, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, null),
new StatementStats(state, state.equals("QUEUED"), true, OptionalDouble.of(0), OptionalDouble.of(0), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, null),
null,
ImmutableList.of(),
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import java.net.URI;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.concurrent.Executor;

import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
Expand Down Expand Up @@ -288,6 +289,8 @@ private static QueryStats immediateFailureQueryStats()
DataSize.ofBytes(0),
DataSize.ofBytes(0),
false,
OptionalDouble.empty(),
OptionalDouble.empty(),
new Duration(0, MILLISECONDS),
new Duration(0, MILLISECONDS),
new Duration(0, MILLISECONDS),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@

import java.net.URI;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -282,6 +283,8 @@ private static QueryResults createQueryResults(
StatementStats.builder()
.setState(state.toString())
.setQueued(state == QUEUED)
.setProgressPercentage(OptionalDouble.empty())
.setRunningPercentage(OptionalDouble.empty())
.setElapsedTimeMillis(elapsedTime.toMillis())
.setQueuedTimeMillis(queuedTime.toMillis())
.build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class BasicStageStats
false,
ImmutableSet.of(),

OptionalDouble.empty(),
OptionalDouble.empty());

private final boolean isScheduled;
Expand All @@ -88,6 +89,7 @@ public class BasicStageStats
private final boolean fullyBlocked;
private final Set<BlockedReason> blockedReasons;
private final OptionalDouble progressPercentage;
private final OptionalDouble runningPercentage;

public BasicStageStats(
boolean isScheduled,
Expand Down Expand Up @@ -122,7 +124,8 @@ public BasicStageStats(
boolean fullyBlocked,
Set<BlockedReason> blockedReasons,

OptionalDouble progressPercentage)
OptionalDouble progressPercentage,
OptionalDouble runningPercentage)
{
this.isScheduled = isScheduled;
this.failedTasks = failedTasks;
Expand All @@ -148,6 +151,7 @@ public BasicStageStats(
this.fullyBlocked = fullyBlocked;
this.blockedReasons = ImmutableSet.copyOf(requireNonNull(blockedReasons, "blockedReasons is null"));
this.progressPercentage = requireNonNull(progressPercentage, "progressPercentage is null");
this.runningPercentage = requireNonNull(runningPercentage, "runningPerentage is null");
}

public boolean isScheduled()
Expand Down Expand Up @@ -270,6 +274,11 @@ public OptionalDouble getProgressPercentage()
return progressPercentage;
}

public OptionalDouble getRunningPercentage()
{
return runningPercentage;
}

public static BasicStageStats aggregateBasicStageStats(Iterable<BasicStageStats> stages)
{
int failedTasks = 0;
Expand Down Expand Up @@ -342,6 +351,10 @@ public static BasicStageStats aggregateBasicStageStats(Iterable<BasicStageStats>
if (isScheduled && totalDrivers != 0) {
progressPercentage = OptionalDouble.of(min(100, (completedDrivers * 100.0) / totalDrivers));
}
OptionalDouble runningPercentage = OptionalDouble.empty();
if (isScheduled && totalDrivers != 0) {
runningPercentage = OptionalDouble.of(min(100, (runningDrivers * 100.0) / totalDrivers));
}

return new BasicStageStats(
isScheduled,
Expand Down Expand Up @@ -376,6 +389,7 @@ public static BasicStageStats aggregateBasicStageStats(Iterable<BasicStageStats>
fullyBlocked,
blockedReasons,

progressPercentage);
progressPercentage,
runningPercentage);
}
}
Loading

0 comments on commit 28150ed

Please sign in to comment.