Skip to content

Commit

Permalink
add watermark metrics for streaming jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
steveniemitz committed May 19, 2021
1 parent a7ddcef commit c9cc655
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@
@Experimental
public interface WorkerMetricsReceiver {
void receiverCounterUpdates(List<CounterUpdate> updates);
default void receiveTopologyUpdate(String systemName, String userName) { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,11 @@ public enum StreamingPerStageSystemCounterNames {

STATE_FETCHES("state_fetches_per_stage"),

STATE_FETCH_LATENCY("state_fetch_latency_per_stage");
STATE_FETCH_LATENCY("state_fetch_latency_per_stage"),

INPUT_WATERMARK_LAG("input_watermark_lag_ms"),

OUTPUT_WATERMARK_LAG("output_watermark_lag_ms");

private final String namePrefix;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.net.HostAndPort;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTimeUtils;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
Expand Down Expand Up @@ -484,6 +485,7 @@ public Instant getCommitCreateTime() {
private final Counter<Long, CounterFactory.CounterDistribution> itemsPerStateFetch;
private Timer refreshWorkTimer;
private Timer statusPageTimer;
private Timer resetWatermarkMetricsTimer;

private final boolean publishCounters;
private Timer globalWorkerUpdatesTimer;
Expand Down Expand Up @@ -537,11 +539,41 @@ private static class StageInfo {
final MetricsContainerRegistry<StreamingStepMetricsContainer> metricsContainerRegistry;
final StreamingModeExecutionStateRegistry executionStateRegistry;
final CounterSet deltaCounters;
final CounterSet cumulativeCounters;
final Counter<Long, Long> throttledMsecs;
final Counter<Long, Long> totalProcessingMsecs;
final Counter<Long, Long> timerProcessingMsecs;
final Counter<Long, Long> stateFetches;
final Counter<Long, CounterFactory.CounterDistribution> stateFetchLatency;
final Counter<Long, Long> inputWatermarkLag;
final Counter<Long, Long> outputWatermarkLag;

private long lastWatermarkUpdate = 0L;

void updateWatermarkMetrics(Instant inputDataWatermark, Instant outputDataWatermark) {
long nowMillis = DateTimeUtils.currentTimeMillis();
long watermarkLag =
Math.max(0, nowMillis - inputDataWatermark.getMillis());
inputWatermarkLag.getAndReset();
inputWatermarkLag.addValue(watermarkLag);

if (outputDataWatermark != null) {
long outputWatermarkLagMillis =
Math.max(0, nowMillis - outputDataWatermark.getMillis());
outputWatermarkLag.getAndReset();
outputWatermarkLag.addValue(outputWatermarkLagMillis);
}

lastWatermarkUpdate = nowMillis;
}

void resetStaleWatermarkMetrics() {
long nowMillis = DateTimeUtils.currentTimeMillis();
if (nowMillis - lastWatermarkUpdate > 60_000 * 5) {
inputWatermarkLag.getAndReset();
outputWatermarkLag.getAndReset();
}
}

StageInfo(
String stageName, String systemName, String userName, StreamingDataflowWorker worker) {
Expand All @@ -551,6 +583,7 @@ private static class StageInfo {
executionStateRegistry = new StreamingModeExecutionStateRegistry(worker);
NameContext nameContext = NameContext.create(stageName, null, systemName, userName);
deltaCounters = new CounterSet();
cumulativeCounters = new CounterSet();
throttledMsecs =
deltaCounters.longSum(
StreamingPerStageSystemCounterNames.THROTTLED_MSECS.counterName(nameContext));
Expand All @@ -566,6 +599,12 @@ private static class StageInfo {
stateFetchLatency =
deltaCounters.distribution(
StreamingPerStageSystemCounterNames.STATE_FETCH_LATENCY.counterName(nameContext));
inputWatermarkLag =
cumulativeCounters.longSum(
StreamingPerStageSystemCounterNames.INPUT_WATERMARK_LAG.counterName(nameContext));
outputWatermarkLag =
cumulativeCounters.longSum(
StreamingPerStageSystemCounterNames.OUTPUT_WATERMARK_LAG.counterName(nameContext));
}

List<CounterUpdate> extractCounterUpdates() {
Expand All @@ -579,6 +618,8 @@ List<CounterUpdate> extractCounterUpdates() {
}
counterUpdates.addAll(
deltaCounters.extractModifiedDeltaUpdates(DataflowCounterUpdateExtractor.INSTANCE));
counterUpdates.addAll(
cumulativeCounters.extractUpdates(false, DataflowCounterUpdateExtractor.INSTANCE));
return counterUpdates;
}

Expand Down Expand Up @@ -950,6 +991,18 @@ public boolean workExecutorIsEmpty() {
public void start() {
running.set(true);

resetWatermarkMetricsTimer = new Timer("ResetWatermarks");
resetWatermarkMetricsTimer.schedule(
new TimerTask() {
@Override
public void run() {
stageInfoMap.values().forEach(StageInfo::resetStaleWatermarkMetrics);
}
},
60_000,
60_000
);

if (windmillServiceEnabled) {
// Schedule the background getConfig thread. Blocks until windmillServer stub is ready.
schedulePeriodicGlobalConfigRequests();
Expand Down Expand Up @@ -1487,7 +1540,7 @@ private void process(
stageInfoMap.computeIfAbsent(
mapTask.getStageName(),
s -> {
String userName = null;
String userName = mapTask.getSystemName();
if (mapTask.getInstructions() != null) {
ParallelInstruction firstInstruction =
Iterables.getFirst(mapTask.getInstructions(), null);
Expand All @@ -1496,6 +1549,9 @@ private void process(
userName = firstInstruction.getName();
}
}

publishTopologyUpdate(mapTask.getSystemName(), userName);

return new StageInfo(s, mapTask.getSystemName(), userName, this);
});

Expand Down Expand Up @@ -1643,6 +1699,8 @@ private void process(
localStateFetcher,
outputBuilder);

stageInfo.updateWatermarkMetrics(inputDataWatermark, outputDataWatermark);

// Blocks while executing work.
executionState.getWorkExecutor().execute();

Expand Down Expand Up @@ -1678,8 +1736,8 @@ private void process(
}

commitQueue.put(new Commit(commitRequest, computationState, work));
commitSizeBytes.addValue((long) commitSize);
commitSizeBytesPerCommit.addValue((long) commitSize);
commitSizeBytes.addValue((long) estimatedCommitSize);
commitSizeBytesPerCommit.addValue((long) estimatedCommitSize);

// Compute shuffle and state byte statistics these will be flushed asynchronously.
long stateBytesWritten = outputBuilder.clearOutputMessages().build().getSerializedSize();
Expand Down Expand Up @@ -2440,6 +2498,16 @@ private void publishCounterUpdates(List<CounterUpdate> updates) {
}
}

private void publishTopologyUpdate(String systemName, String userName) {
for (WorkerMetricsReceiver receiver : workerMetricReceivers) {
try {
receiver.receiveTopologyUpdate(systemName, userName);
} catch (Exception e) {
LOG.error("Error publishing topology update", e);
}
}
}

/**
* Sends a GetData request to Windmill for all sufficiently old active work.
*
Expand Down

0 comments on commit c9cc655

Please sign in to comment.