Skip to content

Commit

Permalink
Add blocked stats for input and output
Browse files Browse the repository at this point in the history
  • Loading branch information
arhimondr committed Mar 29, 2022
1 parent c461069 commit 93aba37
Show file tree
Hide file tree
Showing 22 changed files with 366 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,14 @@ private static QueryStats immediateFailureQueryStats()
DataSize.ofBytes(0),
0,
0,
new Duration(0, MILLISECONDS),
new Duration(0, MILLISECONDS),
DataSize.ofBytes(0),
DataSize.ofBytes(0),
0,
0,
new Duration(0, MILLISECONDS),
new Duration(0, MILLISECONDS),
DataSize.ofBytes(0),
DataSize.ofBytes(0),
ImmutableList.of(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ public void queryImmediateFailureEvent(BasicQueryInfo queryInfo, ExecutionFailur
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
0,
0,
0,
Expand Down Expand Up @@ -270,6 +274,10 @@ private QueryStatistics createQueryStatistics(QueryInfo queryInfo)
Optional.of(ofMillis(queryStats.getAnalysisTime().toMillis())),
Optional.of(ofMillis(queryStats.getPlanningTime().toMillis())),
Optional.of(ofMillis(queryStats.getExecutionTime().toMillis())),
Optional.of(ofMillis(queryStats.getInputBlockedTime().toMillis())),
Optional.of(ofMillis(queryStats.getFailedInputBlockedTime().toMillis())),
Optional.of(ofMillis(queryStats.getOutputBlockedTime().toMillis())),
Optional.of(ofMillis(queryStats.getFailedOutputBlockedTime().toMillis())),
queryStats.getPeakUserMemoryReservation().toBytes(),
queryStats.getPeakTaskUserMemory().toBytes(),
queryStats.getPeakTaskTotalMemory().toBytes(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import static io.trino.util.Failures.toFailure;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

@ThreadSafe
public class QueryStateMachine
Expand Down Expand Up @@ -515,11 +516,17 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage)
long processedInputPositions = 0;
long failedProcessedInputPositions = 0;

long inputBlockedTime = 0;
long failedInputBlockedTime = 0;

long outputDataSize = 0;
long failedOutputDataSize = 0;
long outputPositions = 0;
long failedOutputPositions = 0;

long outputBlockedTime = 0;
long failedOutputBlockedTime = 0;

long physicalWrittenDataSize = 0;
long failedPhysicalWrittenDataSize = 0;

Expand Down Expand Up @@ -583,6 +590,12 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage)
failedProcessedInputPositions += stageStats.getFailedProcessedInputPositions();
}

inputBlockedTime += stageStats.getInputBlockedTime().roundTo(NANOSECONDS);
failedInputBlockedTime += stageStats.getFailedInputBlockedTime().roundTo(NANOSECONDS);

outputBlockedTime += stageStats.getOutputBlockedTime().roundTo(NANOSECONDS);
failedOutputBlockedTime += stageStats.getFailedOutputBlockedTime().roundTo(NANOSECONDS);

physicalWrittenDataSize += stageStats.getPhysicalWrittenDataSize().toBytes();
failedPhysicalWrittenDataSize += stageStats.getFailedPhysicalWrittenDataSize().toBytes();

Expand Down Expand Up @@ -668,11 +681,17 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage)
succinctBytes(failedProcessedInputDataSize),
processedInputPositions,
failedProcessedInputPositions,
new Duration(inputBlockedTime, NANOSECONDS).convertToMostSuccinctTimeUnit(),
new Duration(failedInputBlockedTime, NANOSECONDS).convertToMostSuccinctTimeUnit(),

succinctBytes(outputDataSize),
succinctBytes(failedOutputDataSize),
outputPositions,
failedOutputPositions,

new Duration(outputBlockedTime, NANOSECONDS).convertToMostSuccinctTimeUnit(),
new Duration(failedOutputBlockedTime, NANOSECONDS).convertToMostSuccinctTimeUnit(),

succinctBytes(physicalWrittenDataSize),
succinctBytes(failedPhysicalWrittenDataSize),

Expand Down Expand Up @@ -1232,10 +1251,14 @@ private static QueryStats pruneQueryStats(QueryStats queryStats)
queryStats.getFailedProcessedInputDataSize(),
queryStats.getProcessedInputPositions(),
queryStats.getFailedProcessedInputPositions(),
queryStats.getInputBlockedTime(),
queryStats.getFailedInputBlockedTime(),
queryStats.getOutputDataSize(),
queryStats.getFailedOutputDataSize(),
queryStats.getOutputPositions(),
queryStats.getFailedOutputPositions(),
queryStats.getOutputBlockedTime(),
queryStats.getFailedOutputBlockedTime(),
queryStats.getPhysicalWrittenDataSize(),
queryStats.getFailedPhysicalWrittenDataSize(),
queryStats.getStageGcStatistics(),
Expand Down
42 changes: 42 additions & 0 deletions core/trino-main/src/main/java/io/trino/execution/QueryStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,17 @@ public class QueryStats
private final long processedInputPositions;
private final long failedProcessedInputPositions;

private final Duration inputBlockedTime;
private final Duration failedInputBlockedTime;

private final DataSize outputDataSize;
private final DataSize failedOutputDataSize;
private final long outputPositions;
private final long failedOutputPositions;

private final Duration outputBlockedTime;
private final Duration failedOutputBlockedTime;

private final DataSize physicalWrittenDataSize;
private final DataSize failedPhysicalWrittenDataSize;

Expand Down Expand Up @@ -192,11 +198,17 @@ public QueryStats(
@JsonProperty("processedInputPositions") long processedInputPositions,
@JsonProperty("failedProcessedInputPositions") long failedProcessedInputPositions,

@JsonProperty("inputBlockedTime") Duration inputBlockedTime,
@JsonProperty("failedInputBlockedTime") Duration failedInputBlockedTime,

@JsonProperty("outputDataSize") DataSize outputDataSize,
@JsonProperty("failedOutputDataSize") DataSize failedOutputDataSize,
@JsonProperty("outputPositions") long outputPositions,
@JsonProperty("failedOutputPositions") long failedOutputPositions,

@JsonProperty("outputBlockedTime") Duration outputBlockedTime,
@JsonProperty("failedOutputBlockedTime") Duration failedOutputBlockedTime,

@JsonProperty("physicalWrittenDataSize") DataSize physicalWrittenDataSize,
@JsonProperty("failedPhysicalWrittenDataSize") DataSize failedPhysicalWrittenDataSize,

Expand Down Expand Up @@ -290,13 +302,19 @@ public QueryStats(
checkArgument(failedProcessedInputPositions >= 0, "failedProcessedInputPositions is negative");
this.failedProcessedInputPositions = failedProcessedInputPositions;

this.inputBlockedTime = requireNonNull(inputBlockedTime, "inputBlockedTime is null");
this.failedInputBlockedTime = requireNonNull(failedInputBlockedTime, "failedInputBlockedTime is null");

this.outputDataSize = requireNonNull(outputDataSize, "outputDataSize is null");
this.failedOutputDataSize = requireNonNull(failedOutputDataSize, "failedOutputDataSize is null");
checkArgument(outputPositions >= 0, "outputPositions is negative");
this.outputPositions = outputPositions;
checkArgument(failedOutputPositions >= 0, "failedOutputPositions is negative");
this.failedOutputPositions = failedOutputPositions;

this.outputBlockedTime = requireNonNull(outputBlockedTime, "outputBlockedTime is null");
this.failedOutputBlockedTime = requireNonNull(failedOutputBlockedTime, "failedOutputBlockedTime is null");

this.physicalWrittenDataSize = requireNonNull(physicalWrittenDataSize, "physicalWrittenDataSize is null");
this.failedPhysicalWrittenDataSize = requireNonNull(failedPhysicalWrittenDataSize, "failedPhysicalWrittenDataSize is null");

Expand Down Expand Up @@ -656,6 +674,18 @@ public long getFailedProcessedInputPositions()
return failedProcessedInputPositions;
}

@JsonProperty
public Duration getInputBlockedTime()
{
return inputBlockedTime;
}

@JsonProperty
public Duration getFailedInputBlockedTime()
{
return failedInputBlockedTime;
}

@JsonProperty
public DataSize getOutputDataSize()
{
Expand All @@ -680,6 +710,18 @@ public long getFailedOutputPositions()
return failedOutputPositions;
}

@JsonProperty
public Duration getOutputBlockedTime()
{
return outputBlockedTime;
}

@JsonProperty
public Duration getFailedOutputBlockedTime()
{
return failedOutputBlockedTime;
}

@JsonProperty
public DataSize getPhysicalWrittenDataSize()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,12 +408,18 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
long processedInputPositions = 0;
long failedProcessedInputPositions = 0;

long inputBlockedTime = 0;
long failedInputBlockedTime = 0;

long bufferedDataSize = 0;
long outputDataSize = 0;
long failedOutputDataSize = 0;
long outputPositions = 0;
long failedOutputPositions = 0;

long outputBlockedTime = 0;
long failedOutputBlockedTime = 0;

long physicalWrittenDataSize = 0;
long failedPhysicalWrittenDataSize = 0;

Expand Down Expand Up @@ -484,10 +490,14 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
processedInputDataSize += taskStats.getProcessedInputDataSize().toBytes();
processedInputPositions += taskStats.getProcessedInputPositions();

inputBlockedTime += taskStats.getInputBlockedTime().roundTo(NANOSECONDS);

bufferedDataSize += taskInfo.getOutputBuffers().getTotalBufferedBytes();
outputDataSize += taskStats.getOutputDataSize().toBytes();
outputPositions += taskStats.getOutputPositions();

outputBlockedTime += taskStats.getOutputBlockedTime().roundTo(NANOSECONDS);

physicalWrittenDataSize += taskStats.getPhysicalWrittenDataSize().toBytes();

if (taskState == TaskState.FAILED) {
Expand All @@ -504,10 +514,14 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
failedProcessedInputDataSize += taskStats.getProcessedInputDataSize().toBytes();
failedProcessedInputPositions += taskStats.getProcessedInputPositions();

failedInputBlockedTime += taskStats.getInputBlockedTime().roundTo(NANOSECONDS);

failedOutputDataSize += taskStats.getOutputDataSize().toBytes();
failedOutputPositions += taskStats.getOutputPositions();

failedPhysicalWrittenDataSize += taskStats.getPhysicalWrittenDataSize().toBytes();

failedOutputBlockedTime += taskStats.getOutputBlockedTime().roundTo(NANOSECONDS);
}

fullGcCount += taskStats.getFullGcCount();
Expand Down Expand Up @@ -577,11 +591,15 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
succinctBytes(failedProcessedInputDataSize),
processedInputPositions,
failedProcessedInputPositions,
succinctDuration(inputBlockedTime, NANOSECONDS),
succinctDuration(failedInputBlockedTime, NANOSECONDS),
succinctBytes(bufferedDataSize),
succinctBytes(outputDataSize),
succinctBytes(failedOutputDataSize),
outputPositions,
failedOutputPositions,
succinctDuration(outputBlockedTime, NANOSECONDS),
succinctDuration(failedOutputBlockedTime, NANOSECONDS),
succinctBytes(physicalWrittenDataSize),
succinctBytes(failedPhysicalWrittenDataSize),

Expand Down
42 changes: 42 additions & 0 deletions core/trino-main/src/main/java/io/trino/execution/StageStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,18 @@ public class StageStats
private final long processedInputPositions;
private final long failedProcessedInputPositions;

private final Duration inputBlockedTime;
private final Duration failedInputBlockedTime;

private final DataSize bufferedDataSize;
private final DataSize outputDataSize;
private final DataSize failedOutputDataSize;
private final long outputPositions;
private final long failedOutputPositions;

private final Duration outputBlockedTime;
private final Duration failedOutputBlockedTime;

private final DataSize physicalWrittenDataSize;
private final DataSize failedPhysicalWrittenDataSize;

Expand Down Expand Up @@ -160,12 +166,18 @@ public StageStats(
@JsonProperty("processedInputPositions") long processedInputPositions,
@JsonProperty("failedProcessedInputPositions") long failedProcessedInputPositions,

@JsonProperty("inputBlockedTime") Duration inputBlockedTime,
@JsonProperty("failedInputBlockedTime") Duration failedInputBlockedTime,

@JsonProperty("bufferedDataSize") DataSize bufferedDataSize,
@JsonProperty("outputDataSize") DataSize outputDataSize,
@JsonProperty("failedOutputDataSize") DataSize failedOutputDataSize,
@JsonProperty("outputPositions") long outputPositions,
@JsonProperty("failedOutputPositions") long failedOutputPositions,

@JsonProperty("outputBlockedTime") Duration outputBlockedTime,
@JsonProperty("failedOutputBlockedTime") Duration failedOutputBlockedTime,

@JsonProperty("physicalWrittenDataSize") DataSize physicalWrittenDataSize,
@JsonProperty("failedPhysicalWrittenDataSize") DataSize failedPhysicalWrittenDataSize,

Expand Down Expand Up @@ -242,6 +254,9 @@ public StageStats(
checkArgument(failedProcessedInputPositions >= 0, "failedProcessedInputPositions is negative");
this.failedProcessedInputPositions = failedProcessedInputPositions;

this.inputBlockedTime = requireNonNull(inputBlockedTime, "inputBlockedTime is null");
this.failedInputBlockedTime = requireNonNull(failedInputBlockedTime, "failedInputBlockedTime is null");

this.bufferedDataSize = requireNonNull(bufferedDataSize, "bufferedDataSize is null");
this.outputDataSize = requireNonNull(outputDataSize, "outputDataSize is null");
this.failedOutputDataSize = requireNonNull(failedOutputDataSize, "failedOutputDataSize is null");
Expand All @@ -250,6 +265,9 @@ public StageStats(
checkArgument(failedOutputPositions >= 0, "failedOutputPositions is negative");
this.failedOutputPositions = failedOutputPositions;

this.outputBlockedTime = requireNonNull(outputBlockedTime, "outputBlockedTime is null");
this.failedOutputBlockedTime = requireNonNull(failedOutputBlockedTime, "failedOutputBlockedTime is null");

this.physicalWrittenDataSize = requireNonNull(physicalWrittenDataSize, "physicalWrittenDataSize is null");
this.failedPhysicalWrittenDataSize = requireNonNull(failedPhysicalWrittenDataSize, "failedPhysicalWrittenDataSize is null");

Expand Down Expand Up @@ -516,6 +534,18 @@ public long getFailedProcessedInputPositions()
return failedProcessedInputPositions;
}

@JsonProperty
public Duration getInputBlockedTime()
{
return inputBlockedTime;
}

@JsonProperty
public Duration getFailedInputBlockedTime()
{
return failedInputBlockedTime;
}

@JsonProperty
public DataSize getBufferedDataSize()
{
Expand Down Expand Up @@ -546,6 +576,18 @@ public long getFailedOutputPositions()
return failedOutputPositions;
}

@JsonProperty
public Duration getOutputBlockedTime()
{
return outputBlockedTime;
}

@JsonProperty
public Duration getFailedOutputBlockedTime()
{
return failedOutputBlockedTime;
}

@JsonProperty
public DataSize getPhysicalWrittenDataSize()
{
Expand Down
Loading

0 comments on commit 93aba37

Please sign in to comment.