Skip to content

Commit

Permalink
[FLINK-33187] Use hashcode to deduplicate scaling events
Browse files Browse the repository at this point in the history
  • Loading branch information
clarax authored Oct 24, 2023
1 parent cc680e1 commit faaff56
Show file tree
Hide file tree
Showing 14 changed files with 791 additions and 237 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
<td>Enable vertex scaling execution by the autoscaler. If disabled, the autoscaler will only collect metrics and evaluate the suggested parallelism for each vertex but will not upgrade the jobs.</td>
</tr>
<tr>
<td><h5>job.autoscaler.scaling.report.interval</h5></td>
<td><h5>job.autoscaler.scaling.event.interval</h5></td>
<td style="word-wrap: break-word;">30 min</td>
<td>Duration</td>
<td>Time interval to resend the identical event</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
Expand Down Expand Up @@ -219,7 +220,7 @@ private boolean detectIneffectiveScaleUp(
INEFFECTIVE_SCALING,
message,
null,
null);
conf.get(SCALING_EVENT_INTERVAL));

if (conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
LOG.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,15 @@
import java.util.SortedMap;

import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.addToScalingHistoryAndStore;
import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.getTrimmedScalingHistory;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;

/** Class responsible for executing scaling decisions. */
public class ScalingExecutor<KEY, Context extends JobAutoScalerContext<KEY>> {
public static final String SCALING_SUMMARY_ENTRY =
" Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f -> %.2f | Target data rate %.2f";
public static final String SCALING_SUMMARY_HEADER_SCALING_DISABLED =
"Recommended parallelism change:";
public static final String SCALING_SUMMARY_HEADER_SCALING_ENABLED = "Scaling vertices:";
@VisibleForTesting static final String SCALING_REPORT_REASON = "ScalingReport";

private static final Logger LOG = LoggerFactory.getLogger(ScalingExecutor.class);

private final JobVertexScaler<KEY, Context> jobVertexScaler;
Expand Down Expand Up @@ -100,18 +92,11 @@ public boolean scaleResource(

updateRecommendedParallelism(evaluatedMetrics, scalingSummaries);

var scalingEnabled = conf.get(SCALING_ENABLED);

var scalingReport = scalingReport(scalingSummaries, scalingEnabled);
autoScalerEventHandler.handleEvent(
context,
AutoScalerEventHandler.Type.Normal,
SCALING_REPORT_REASON,
scalingReport,
"ScalingExecutor",
scalingEnabled ? null : conf.get(AutoScalerOptions.SCALING_REPORT_INTERVAL));
var scaleEnabled = conf.get(SCALING_ENABLED);
autoScalerEventHandler.handleScalingEvent(
context, scalingSummaries, scaleEnabled, conf.get(SCALING_EVENT_INTERVAL));

if (!scalingEnabled) {
if (!scaleEnabled) {
return false;
}

Expand All @@ -136,27 +121,6 @@ private void updateRecommendedParallelism(
scalingSummary.getNewParallelism())));
}

private static String scalingReport(
Map<JobVertexID, ScalingSummary> scalingSummaries, boolean scalingEnabled) {
StringBuilder sb =
new StringBuilder(
scalingEnabled
? SCALING_SUMMARY_HEADER_SCALING_ENABLED
: SCALING_SUMMARY_HEADER_SCALING_DISABLED);
scalingSummaries.forEach(
(v, s) ->
sb.append(
String.format(
SCALING_SUMMARY_ENTRY,
v,
s.getCurrentParallelism(),
s.getNewParallelism(),
s.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(),
s.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(),
s.getMetrics().get(TARGET_DATA_RATE).getAverage())));
return sb.toString();
}

protected static boolean allVerticesWithinUtilizationTarget(
Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics,
Map<JobVertexID, ScalingSummary> scalingSummaries) {
Expand Down Expand Up @@ -190,7 +154,8 @@ protected static boolean allVerticesWithinUtilizationTarget(
return true;
}

private Map<JobVertexID, ScalingSummary> computeScalingSummary(
@VisibleForTesting
Map<JobVertexID, ScalingSummary> computeScalingSummary(
Context context,
Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics,
Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,11 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
.withDescription(
"A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");

public static final ConfigOption<Duration> SCALING_REPORT_INTERVAL =
autoScalerConfig("scaling.report.interval")
public static final ConfigOption<Duration> SCALING_EVENT_INTERVAL =
autoScalerConfig("scaling.event.interval")
.durationType()
.defaultValue(Duration.ofSeconds(1800))
.withDeprecatedKeys(deprecatedOperatorConfigKey("scaling.event.interval"))
.withDescription("Time interval to resend the identical event");

public static final ConfigOption<Duration> FLINK_CLIENT_TIMEOUT =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,17 @@

import org.apache.flink.annotation.Experimental;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.runtime.jobgraph.JobVertexID;

import javax.annotation.Nullable;

import java.time.Duration;
import java.util.Map;

import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;

/**
* Handler for autoscaler events.
Expand All @@ -32,12 +39,17 @@
*/
@Experimental
public interface AutoScalerEventHandler<KEY, Context extends JobAutoScalerContext<KEY>> {
String SCALING_SUMMARY_ENTRY =
" Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f -> %.2f | Target data rate %.2f";
String SCALING_SUMMARY_HEADER_SCALING_DISABLED = "Recommended parallelism change:";
String SCALING_SUMMARY_HEADER_SCALING_ENABLED = "Scaling vertices:";
String SCALING_REPORT_REASON = "ScalingReport";
String SCALING_REPORT_KEY = "ScalingExecutor";

/**
* Handle the event.
*
* @param interval When interval is great than 0, events that repeat within the interval will be
* ignored.
* @param interval Define the interval to suppress duplicate events. No dedupe if null.
*/
void handleEvent(
Context context,
Expand All @@ -47,6 +59,50 @@ void handleEvent(
@Nullable String messageKey,
@Nullable Duration interval);

/**
* Handle scaling reports.
*
* @param interval Define the interval to suppress duplicate events.
* @param scaled Whether AutoScaler actually scaled the Flink job or just generate advice for
* scaling.
*/
default void handleScalingEvent(
Context context,
Map<JobVertexID, ScalingSummary> scalingSummaries,
boolean scaled,
Duration interval) {
// Provide default implementation without proper deduplication
var scalingReport = scalingReport(scalingSummaries, scaled);
handleEvent(
context,
Type.Normal,
SCALING_REPORT_REASON,
scalingReport,
SCALING_REPORT_KEY,
interval);
}

static String scalingReport(
Map<JobVertexID, ScalingSummary> scalingSummaries, boolean scalingEnabled) {
StringBuilder sb =
new StringBuilder(
scalingEnabled
? SCALING_SUMMARY_HEADER_SCALING_ENABLED
: SCALING_SUMMARY_HEADER_SCALING_DISABLED);
scalingSummaries.forEach(
(v, s) ->
sb.append(
String.format(
SCALING_SUMMARY_ENTRY,
v,
s.getCurrentParallelism(),
s.getNewParallelism(),
s.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(),
s.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(),
s.getMetrics().get(TARGET_DATA_RATE).getAverage())));
return sb.toString();
}

/** The type of the events. */
enum Type {
Normal,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,39 @@ public void testSendingIneffectiveScalingEvents() {
assertThat(event.getMessage())
.isEqualTo(String.format(INEFFECTIVE_MESSAGE_FORMAT, jobVertexID));
assertThat(event.getReason()).isEqualTo(INEFFECTIVE_SCALING);
assertEquals(1, event.getCount());

// Repeat ineffective scale with default interval, no event is triggered
assertEquals(
20,
vertexScaler.computeScaleTargetParallelism(
context, jobVertexID, evaluated, history));
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
assertEquals(0, eventCollector.events.size());

// Repeat ineffective scale with postive interval, no event is triggered
conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ofSeconds(1800));
assertEquals(
20,
vertexScaler.computeScaleTargetParallelism(
context, jobVertexID, evaluated, history));
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
assertEquals(0, eventCollector.events.size());

// Ineffective scale with interval set to 0, an event is triggered
conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ZERO);
assertEquals(
20,
vertexScaler.computeScaleTargetParallelism(
context, jobVertexID, evaluated, history));
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
assertEquals(1, eventCollector.events.size());
event = eventCollector.events.poll();
assertThat(event).isNotNull();
assertThat(event.getMessage())
.isEqualTo(String.format(INEFFECTIVE_MESSAGE_FORMAT, jobVertexID));
assertThat(event.getReason()).isEqualTo(INEFFECTIVE_SCALING);
assertEquals(2, event.getCount());
}

private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.flink.autoscaler.ScalingExecutor.SCALING_SUMMARY_ENTRY;
import static org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_REPORT_REASON;
import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_SUMMARY_ENTRY;
import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_DISABLED;
import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_ENABLED;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -151,19 +154,20 @@ public void testVertexesExclusionForScaling() throws Exception {

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testScalingEventsWith0Interval(boolean scalingEnabled) throws Exception {
public void testScalingEventsWith0IntervalConfig(boolean scalingEnabled) throws Exception {
testScalingEvents(scalingEnabled, Duration.ofSeconds(0));
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testScalingEventsWithInterval(boolean scalingEnabled) throws Exception {
public void testScalingEventsWithIntervalConfig(boolean scalingEnabled) throws Exception {
testScalingEvents(scalingEnabled, Duration.ofSeconds(1800));
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testScalingEventsWithDefaultInterval(boolean scalingEnabled) throws Exception {
public void testScalingEventsWithDefaultIntervalConfig(boolean scalingEnabled)
throws Exception {
testScalingEvents(scalingEnabled, null);
}

Expand All @@ -175,17 +179,13 @@ private void testScalingEvents(boolean scalingEnabled, Duration interval) throws
var metrics = Map.of(jobVertexID, evaluated(1, 110, 100));

if (interval != null) {
conf.set(AutoScalerOptions.SCALING_REPORT_INTERVAL, interval);
conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, interval);
}

assertEquals(scalingEnabled, scalingDecisionExecutor.scaleResource(context, metrics));
assertEquals(scalingEnabled, scalingDecisionExecutor.scaleResource(context, metrics));

int expectedSize =
(interval == null || (!interval.isNegative() && !interval.isZero()))
&& !scalingEnabled
? 1
: 2;
int expectedSize = (interval == null || interval.toMillis() > 0) && !scalingEnabled ? 1 : 2;
assertEquals(expectedSize, eventCollector.events.size());

TestingEventCollector.Event<JobID, JobAutoScalerContext<JobID>> event;
Expand All @@ -208,9 +208,9 @@ private void testScalingEvents(boolean scalingEnabled, Duration interval) throws
event.getMessage()
.contains(
scalingEnabled
? ScalingExecutor.SCALING_SUMMARY_HEADER_SCALING_ENABLED
: ScalingExecutor.SCALING_SUMMARY_HEADER_SCALING_DISABLED));
assertEquals(ScalingExecutor.SCALING_REPORT_REASON, event.getReason());
? SCALING_SUMMARY_HEADER_SCALING_ENABLED
: SCALING_SUMMARY_HEADER_SCALING_DISABLED));
assertEquals(SCALING_REPORT_REASON, event.getReason());

metrics = Map.of(jobVertexID, evaluated(1, 110, 101));

Expand Down
Loading

0 comments on commit faaff56

Please sign in to comment.