Skip to content

Commit

Permalink
[FLINK-33306] Use observed source throughput as true processing rate
Browse files Browse the repository at this point in the history
  • Loading branch information
gyfora authored Oct 24, 2023
1 parent e905a1b commit cc680e1
Show file tree
Hide file tree
Showing 23 changed files with 773 additions and 164 deletions.
18 changes: 18 additions & 0 deletions docs/layouts/shortcodes/generated/auto_scaler_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,24 @@
<td>Duration</td>
<td>Scaling metrics aggregation window size.</td>
</tr>
<tr>
<td><h5>job.autoscaler.observed-true-processing-rate.lag-threshold</h5></td>
<td style="word-wrap: break-word;">30 s</td>
<td>Duration</td>
<td>Lag threshold for enabling observed true processing rate measurements.</td>
</tr>
<tr>
<td><h5>job.autoscaler.observed-true-processing-rate.min-observations</h5></td>
<td style="word-wrap: break-word;">2</td>
<td>Integer</td>
<td>Minimum nr of observations used when estimating / switching to observed true processing rate.</td>
</tr>
<tr>
<td><h5>job.autoscaler.observed-true-processing-rate.switch-threshold</h5></td>
<td style="word-wrap: break-word;">0.15</td>
<td>Double</td>
<td>Percentage threshold for switching to observed from busy time based true processing rate if the measurement is off by at least the configured fraction. For example 0.15 means we switch to observed if the busy time based computation is at least 15% higher during catchup.</td>
</tr>
<tr>
<td><h5>job.autoscaler.restart.time</h5></td>
<td style="word-wrap: break-word;">3 min</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ protected static boolean allVerticesWithinUtilizationTarget(

for (Map.Entry<JobVertexID, ScalingSummary> entry : scalingSummaries.entrySet()) {
var vertex = entry.getKey();
var scalingSummary = entry.getValue();
var metrics = evaluatedMetrics.get(vertex);

double processingRate = metrics.get(TRUE_PROCESSING_RATE).getAverage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.updateVertexList;
import static org.apache.flink.autoscaler.utils.AutoScalerUtils.excludeVerticesFromScaling;
Expand Down Expand Up @@ -90,39 +92,31 @@ public CollectedMetricHistory updateMetrics(
jobKey,
(k) -> {
try {
return stateStore.getEvaluatedMetrics(ctx).orElse(new TreeMap<>());
return stateStore.getCollectedMetrics(ctx).orElse(new TreeMap<>());
} catch (Exception exception) {
throw new RuntimeException(
"Get evaluated metrics failed.", exception);
}
});

// The timestamp of the first metric observation marks the start
// If we haven't collected any metrics, we are starting now
var metricCollectionStartTs = metricHistory.isEmpty() ? now : metricHistory.firstKey();

var jobDetailsInfo =
getJobDetailsInfo(ctx, conf.get(AutoScalerOptions.FLINK_CLIENT_TIMEOUT));
var jobUpdateTs = getJobUpdateTs(jobDetailsInfo);
if (jobUpdateTs.isAfter(metricCollectionStartTs)) {
// We detect job change compared to our collected metrics by checking against the earliest
// metric timestamp
if (!metricHistory.isEmpty() && jobUpdateTs.isAfter(metricHistory.firstKey())) {
LOG.info("Job updated at {}. Clearing metrics.", jobUpdateTs);
stateStore.removeEvaluatedMetrics(ctx);
stateStore.removeCollectedMetrics(ctx);
cleanup(ctx.getJobKey());
metricHistory.clear();
metricCollectionStartTs = now;
}
var topology = getJobTopology(ctx, stateStore, jobDetailsInfo);
var stableTime = jobUpdateTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL));

// Trim metrics outside the metric window from metrics history
// Calculate timestamp when the metric windows is full
var metricWindowSize = getMetricWindowSize(conf);
metricHistory.headMap(now.minus(metricWindowSize)).clear();

var stableTime = jobUpdateTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL));
if (now.isBefore(stableTime)) {
// As long as we are stabilizing, collect no metrics at all
LOG.info("Skipping metric collection during stabilization period until {}", stableTime);
return new CollectedMetricHistory(topology, Collections.emptySortedMap());
}
var windowFullTime =
getWindowFullTime(metricHistory.tailMap(stableTime), now, metricWindowSize);

// The filtered list of metrics we want to query for each vertex
var filteredVertexMetricNames = queryFilteredMetricNames(ctx, topology);
Expand All @@ -136,24 +130,38 @@ public CollectedMetricHistory updateMetrics(

// Add scaling metrics to history if they were computed successfully
metricHistory.put(now, scalingMetrics);
stateStore.storeEvaluatedMetrics(ctx, metricHistory);

var collectedMetrics = new CollectedMetricHistory(topology, metricHistory);

var windowFullTime = metricCollectionStartTs.plus(metricWindowSize);
collectedMetrics.setFullyCollected(!now.isBefore(windowFullTime));
if (now.isBefore(stableTime)) {
LOG.info("Stabilizing until {}", stableTime);
stateStore.storeCollectedMetrics(ctx, metricHistory);
return new CollectedMetricHistory(topology, Collections.emptySortedMap());
}

if (!collectedMetrics.isFullyCollected()) {
var collectedMetrics = new CollectedMetricHistory(topology, metricHistory);
if (now.isBefore(windowFullTime)) {
LOG.info("Metric window not full until {}", windowFullTime);
} else {
collectedMetrics.setFullyCollected(true);
// Trim metrics outside the metric window from metrics history
metricHistory.headMap(now.minus(metricWindowSize)).clear();
}

stateStore.storeCollectedMetrics(ctx, metricHistory);
return collectedMetrics;
}

protected Duration getMetricWindowSize(Configuration conf) {
return conf.get(AutoScalerOptions.METRICS_WINDOW);
}

private static Instant getWindowFullTime(
SortedMap<Instant, CollectedMetrics> metricsAfterStable,
Instant now,
Duration metricWindowSize) {
return metricsAfterStable.isEmpty()
? now.plus(metricWindowSize)
: metricsAfterStable.firstKey().plus(metricWindowSize);
}

@VisibleForTesting
protected Instant getJobUpdateTs(JobDetailsInfo jobDetailsInfo) {
return Instant.ofEpochMilli(
Expand Down Expand Up @@ -265,9 +273,11 @@ private CollectedMetrics convertToScalingMetrics(
ScalingMetrics.computeLoadMetrics(
jobVertexID, vertexFlinkMetrics, vertexScalingMetrics, conf);

var metricHistory =
histories.getOrDefault(jobKey, Collections.emptySortedMap());
double lagGrowthRate =
computeLagGrowthRate(
jobKey,
metricHistory,
jobVertexID,
vertexScalingMetrics.get(ScalingMetric.LAG));

Expand All @@ -277,8 +287,13 @@ private CollectedMetrics convertToScalingMetrics(
vertexScalingMetrics,
jobTopology,
lagGrowthRate,
conf);

conf,
observedTprAvg(
jobVertexID,
metricHistory,
conf.get(
AutoScalerOptions
.OBSERVED_TRUE_PROCESSING_RATE_MIN_OBSERVATIONS)));
vertexScalingMetrics
.entrySet()
.forEach(e -> e.setValue(ScalingMetrics.roundMetric(e.getValue())));
Expand All @@ -292,10 +307,21 @@ private CollectedMetrics convertToScalingMetrics(
return new CollectedMetrics(out, outputRatios);
}

private double computeLagGrowthRate(KEY jobKey, JobVertexID jobVertexID, Double currentLag) {
var metricHistory = histories.get(jobKey);
private static Supplier<Double> observedTprAvg(
JobVertexID jobVertexID,
SortedMap<Instant, CollectedMetrics> metricHistory,
int minObservations) {
return () ->
ScalingMetricEvaluator.getAverage(
ScalingMetric.OBSERVED_TPR, jobVertexID, metricHistory, minObservations);
}

private double computeLagGrowthRate(
SortedMap<Instant, CollectedMetrics> metricHistory,
JobVertexID jobVertexID,
Double currentLag) {

if (metricHistory == null || metricHistory.isEmpty()) {
if (metricHistory.isEmpty()) {
return Double.NaN;
}

Expand Down Expand Up @@ -332,30 +358,39 @@ protected Map<JobVertexID, Map<String, FlinkMetric>> queryFilteredMetricNames(
&& previousMetricNames
.keySet()
.equals(topology.getParallelisms().keySet())) {
// We have already gathered the metric names for this topology
return previousMetricNames;
var newMetricNames = new HashMap<>(previousMetricNames);
var sourceMetricNames =
queryFilteredMetricNames(
ctx,
topology,
vertices.stream().filter(topology::isSource));
newMetricNames.putAll(sourceMetricNames);
return newMetricNames;
}

try (var restClient = ctx.getRestClusterClient()) {
return vertices.stream()
.filter(v -> !topology.getFinishedVertices().contains(v))
.collect(
Collectors.toMap(
v -> v,
v ->
getFilteredVertexMetricNames(
restClient,
ctx.getJobID(),
v,
topology)));
} catch (Exception e) {
throw new RuntimeException(e);
}
// Query all metric names
return queryFilteredMetricNames(ctx, topology, vertices.stream());
});
names.keySet().removeAll(topology.getFinishedVertices());
return names;
}

private Map<JobVertexID, Map<String, FlinkMetric>> queryFilteredMetricNames(
Context ctx, JobTopology topology, Stream<JobVertexID> vertexStream) {
try (var restClient = ctx.getRestClusterClient()) {
return vertexStream
.filter(v -> !topology.getFinishedVertices().contains(v))
.collect(
Collectors.toMap(
v -> v,
v ->
getFilteredVertexMetricNames(
restClient, ctx.getJobID(), v, topology)));
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* Query and filter metric names for a given job vertex.
*
Expand All @@ -378,6 +413,7 @@ protected Map<String, FlinkMetric> getFilteredVertexMetricNames(
requiredMetrics.add(FlinkMetric.BUSY_TIME_PER_SEC);

if (topology.isSource(jobVertexID)) {
requiredMetrics.add(FlinkMetric.BACKPRESSURE_TIME_PER_SEC);
requiredMetrics.add(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
// Pending records metric won't be available for some sources.
// The Kafka source, for instance, lazily initializes this metric on receiving
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static org.apache.flink.autoscaler.metrics.ScalingMetric.LAG;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.LOAD;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.OBSERVED_TPR;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
Expand Down Expand Up @@ -135,9 +136,7 @@ private Map<ScalingMetric, EvaluatedScalingMetric> evaluateMetrics(

evaluatedMetrics.put(
TRUE_PROCESSING_RATE,
new EvaluatedScalingMetric(
latestVertexMetrics.get(TRUE_PROCESSING_RATE),
getAverage(TRUE_PROCESSING_RATE, vertex, metricsHistory)));
evaluateTpr(metricsHistory, vertex, latestVertexMetrics, conf));

evaluatedMetrics.put(
LOAD,
Expand All @@ -154,6 +153,57 @@ private Map<ScalingMetric, EvaluatedScalingMetric> evaluateMetrics(
return evaluatedMetrics;
}

private static EvaluatedScalingMetric evaluateTpr(
SortedMap<Instant, CollectedMetrics> metricsHistory,
JobVertexID vertex,
Map<ScalingMetric, Double> latestVertexMetrics,
Configuration conf) {

var busyTimeTprAvg = getAverage(TRUE_PROCESSING_RATE, vertex, metricsHistory);
var observedTprAvg =
getAverage(
OBSERVED_TPR,
vertex,
metricsHistory,
conf.get(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_MIN_OBSERVATIONS));

var tprMetric = selectTprMetric(vertex, conf, busyTimeTprAvg, observedTprAvg);
return new EvaluatedScalingMetric(
latestVertexMetrics.getOrDefault(tprMetric, Double.NaN),
tprMetric == OBSERVED_TPR ? observedTprAvg : busyTimeTprAvg);
}

private static ScalingMetric selectTprMetric(
JobVertexID jobVertexID,
Configuration conf,
double busyTimeTprAvg,
double observedTprAvg) {

if (Double.isInfinite(busyTimeTprAvg) || Double.isNaN(busyTimeTprAvg)) {
return OBSERVED_TPR;
}

if (Double.isNaN(observedTprAvg)) {
return TRUE_PROCESSING_RATE;
}

double switchThreshold =
conf.get(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_SWITCH_THRESHOLD);
// If we could measure the observed tpr we decide whether to switch to using it
// instead of busy time based on the error / difference between the two
if (busyTimeTprAvg > observedTprAvg * (1 + switchThreshold)) {
LOG.debug(
"Using observed tpr {} for {} as busy time based seems too large ({})",
observedTprAvg,
jobVertexID,
busyTimeTprAvg);
return OBSERVED_TPR;
} else {
LOG.debug("Using busy time based tpr {} for {}.", busyTimeTprAvg, jobVertexID);
return TRUE_PROCESSING_RATE;
}
}

@VisibleForTesting
protected static void computeProcessingRateThresholds(
Map<ScalingMetric, EvaluatedScalingMetric> metrics,
Expand Down Expand Up @@ -241,25 +291,40 @@ private void computeTargetDataRate(
}
}

private static double getAverage(
public static double getAverage(
ScalingMetric metric,
JobVertexID jobVertexId,
SortedMap<Instant, CollectedMetrics> metricsHistory) {
double[] metricValues =
metricsHistory.values().stream()
.map(m -> m.getVertexMetrics().get(jobVertexId))
.filter(m -> m.containsKey(metric))
.mapToDouble(m -> m.get(metric))
.filter(d -> !Double.isNaN(d))
.toArray();
for (double metricValue : metricValues) {
if (Double.isInfinite(metricValue)) {
// As long as infinite values are present, we can't properly average. We need to
// wait until they are evicted.
return metricValue;
return getAverage(metric, jobVertexId, metricsHistory, 1);
}

public static double getAverage(
ScalingMetric metric,
JobVertexID jobVertexId,
SortedMap<Instant, CollectedMetrics> metricsHistory,
int minElements) {

double sum = 0;
int n = 0;
boolean anyInfinite = false;
for (var collectedMetrics : metricsHistory.values()) {
var metrics = collectedMetrics.getVertexMetrics().get(jobVertexId);
double num = metrics.getOrDefault(metric, Double.NaN);
if (Double.isNaN(num)) {
continue;
}
if (Double.isInfinite(num)) {
anyInfinite = true;
continue;
}

sum += num;
n++;
}
return StatUtils.mean(metricValues);
if (n == 0) {
return anyInfinite ? Double.POSITIVE_INFINITY : Double.NaN;
}
return n < minElements ? Double.NaN : sum / n;
}

private static double getAverageOutputRatio(
Expand Down
Loading

0 comments on commit cc680e1

Please sign in to comment.