-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Expose fault tolerant execution and filesystem exchange metrics via JMX #12127
Conversation
I had to move exchange plugin from the |
33de1e0
to
06986a0
Compare
{ | ||
ExecutionFailureInfo failureInfo = info.getTaskStatus().getFailures().stream() | ||
.findFirst() | ||
.orElse(toFailure(new TrinoException(GENERIC_INTERNAL_ERROR, "A task failed for an unknown reason"))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When will this happen in practice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather unlikely, but we have a similar safeguard in other places
Bootstrap app = new Bootstrap( | ||
new MBeanModule(), | ||
new MBeanServerModule(), | ||
new PrefixObjectNameGeneratorModule("io.trino.plugin.exchange", "trino.plugin.exchange"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to export the JMX beans under a specific prefix
ImmutableList.Builder<ListenableFuture<Void>> futures = ImmutableList.builder(); | ||
for (Integer taskPartitionId : allSinks) { | ||
exchangeStorage.deleteRecursively(getTaskOutputDirectory(taskPartitionId)); | ||
futures.add(exchangeStorage.deleteRecursively(getTaskOutputDirectory(taskPartitionId))); | ||
} | ||
stats.getCloseExchange().record(Futures.allAsList(futures.build())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not on the critical path, do we need to record here too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be useful for tracking (for example if for some reason it starts to fail)
@Managed | ||
@Nested |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my understanding, can you briefly explain what these two annotations do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Methods annotated with @Managed
are exported via JMX. @Nested
tells the framework to recurse into the object returned by a method and export nested methods annotated with @Managed
.
private final List<ExchangeStorageReader> readers; | ||
private volatile CompletableFuture<Void> blocked; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this an optimization? Why do we need a class member blocked
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to make sure only a single blocked
future exists and tracked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite understand, why not simply the following:
return stats.getExchangeSourceBlocked().record(toCompletableFuture(
nonCancellationPropagating(
whenAnyComplete(readers.stream()
.map(ExchangeStorageReader::isBlocked)
.collect(toImmutableList())))));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because isBlocked
is called by multiple threads concurrently and a different feature may be returned to a different thread skewing the metric while ideally we would like to keep our measurements as close as possible to the time it takes for the entire ExchangeSource
to transition from "blocked" state to "non-blocked".
case RUNNING: | ||
case FLUSHING: | ||
default: | ||
log.error("Unexpected task state: %s", state); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not throw?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exceptions thrown out of a listener are not logged. So it's more of a "log it or loose it".
case CANCELED: | ||
case ABORTED: | ||
// ignore cancelled and aborted tasks | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know currently dependent tasks are not executing until upstream task comptes. Yet it will change. In such case would we make downstream task as "FAILED" or "ABORTED".
If the latter then we should also compute stats for "ABORTED" tasks. It would be important to understand how much effort we are wasting on those.
Maybe we can just merge FAILED and ABORTED?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to add private final ExecutionStats abortedTasks = new ExecutionStats();
and store the stats for both, ABORTED
and CANCELLED
tasks there. Not sure if it makes sense to track metrics separately.
private final TimeStat elapsedTime = new TimeStat(MILLISECONDS); | ||
private final TimeStat scheduledTime = new TimeStat(MILLISECONDS); | ||
private final TimeStat cpuTime = new TimeStat(MILLISECONDS); | ||
private final TimeStat inputBlockedTime = new TimeStat(MILLISECONDS); | ||
private final TimeStat outputBlockedTime = new TimeStat(MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we have memory and network stats here too?
It feels not costly to add them and then we can decide which are the most important for us for tracking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if there's a reliable metric for network as the network traffic can occur at many different levels (connector / exchanges / coordinator-to-worker communication). Though it certainly feels like it would make sense to record peak memory utilization. Let me add it.
core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantExecutionStats.java
Outdated
Show resolved
Hide resolved
|
||
public <T> CompletableFuture<T> record(CompletableFuture<T> future) | ||
{ | ||
long start = System.currentTimeMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Use Ticker/Stopwatch pair instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Move from `io.trino.plugin.exchange` to `io.trino.plugin.exchange.filesystem`
From trino-exchange to trino-exchange-filesystem
06986a0
to
6f8d936
Compare
Description
Exposes fault tolerant execution related operational metrics via JMX to enable live monitoring
Improvement
Core, Exchange
N/A
Related issues, pull requests, and links
-
Documentation
(x) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.
Release notes
(x) No release notes entries required.
( ) Release notes entries required with the following suggested text: