diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index 0221b56741..0b5f33a8e2 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -117,7 +117,7 @@
Enable vertex scaling execution by the autoscaler. If disabled, the autoscaler will only collect metrics and evaluate the suggested parallelism for each vertex but will not upgrade the jobs. |
- job.autoscaler.scaling.report.interval |
+ job.autoscaler.scaling.event.interval |
30 min |
Duration |
Time interval to resend the identical event |
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
index 9bc46b7e57..aba6ac6749 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
@@ -39,6 +39,7 @@
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
@@ -219,7 +220,7 @@ private boolean detectIneffectiveScaleUp(
INEFFECTIVE_SCALING,
message,
null,
- null);
+ conf.get(SCALING_EVENT_INTERVAL));
if (conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
LOG.warn(
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
index 406b17d734..0e1a47b8e7 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
@@ -38,23 +38,15 @@
import java.util.SortedMap;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.addToScalingHistoryAndStore;
import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.getTrimmedScalingHistory;
-import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
-import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
/** Class responsible for executing scaling decisions. */
public class ScalingExecutor> {
- public static final String SCALING_SUMMARY_ENTRY =
- " Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f -> %.2f | Target data rate %.2f";
- public static final String SCALING_SUMMARY_HEADER_SCALING_DISABLED =
- "Recommended parallelism change:";
- public static final String SCALING_SUMMARY_HEADER_SCALING_ENABLED = "Scaling vertices:";
- @VisibleForTesting static final String SCALING_REPORT_REASON = "ScalingReport";
-
private static final Logger LOG = LoggerFactory.getLogger(ScalingExecutor.class);
private final JobVertexScaler jobVertexScaler;
@@ -100,18 +92,11 @@ public boolean scaleResource(
updateRecommendedParallelism(evaluatedMetrics, scalingSummaries);
- var scalingEnabled = conf.get(SCALING_ENABLED);
-
- var scalingReport = scalingReport(scalingSummaries, scalingEnabled);
- autoScalerEventHandler.handleEvent(
- context,
- AutoScalerEventHandler.Type.Normal,
- SCALING_REPORT_REASON,
- scalingReport,
- "ScalingExecutor",
- scalingEnabled ? null : conf.get(AutoScalerOptions.SCALING_REPORT_INTERVAL));
+ var scaleEnabled = conf.get(SCALING_ENABLED);
+ autoScalerEventHandler.handleScalingEvent(
+ context, scalingSummaries, scaleEnabled, conf.get(SCALING_EVENT_INTERVAL));
- if (!scalingEnabled) {
+ if (!scaleEnabled) {
return false;
}
@@ -136,27 +121,6 @@ private void updateRecommendedParallelism(
scalingSummary.getNewParallelism())));
}
- private static String scalingReport(
- Map scalingSummaries, boolean scalingEnabled) {
- StringBuilder sb =
- new StringBuilder(
- scalingEnabled
- ? SCALING_SUMMARY_HEADER_SCALING_ENABLED
- : SCALING_SUMMARY_HEADER_SCALING_DISABLED);
- scalingSummaries.forEach(
- (v, s) ->
- sb.append(
- String.format(
- SCALING_SUMMARY_ENTRY,
- v,
- s.getCurrentParallelism(),
- s.getNewParallelism(),
- s.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(),
- s.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(),
- s.getMetrics().get(TARGET_DATA_RATE).getAverage())));
- return sb.toString();
- }
-
protected static boolean allVerticesWithinUtilizationTarget(
Map> evaluatedMetrics,
Map scalingSummaries) {
@@ -190,7 +154,8 @@ protected static boolean allVerticesWithinUtilizationTarget(
return true;
}
- private Map computeScalingSummary(
+ @VisibleForTesting
+ Map computeScalingSummary(
Context context,
Map> evaluatedMetrics,
Map> scalingHistory) {
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
index 626921bcb6..e8872a35c5 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
@@ -231,10 +231,11 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
.withDescription(
"A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
- public static final ConfigOption SCALING_REPORT_INTERVAL =
- autoScalerConfig("scaling.report.interval")
+ public static final ConfigOption SCALING_EVENT_INTERVAL =
+ autoScalerConfig("scaling.event.interval")
.durationType()
.defaultValue(Duration.ofSeconds(1800))
+ .withDeprecatedKeys(deprecatedOperatorConfigKey("scaling.event.interval"))
.withDescription("Time interval to resend the identical event");
public static final ConfigOption FLINK_CLIENT_TIMEOUT =
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
index a5a0edfefe..5c49f9c550 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
@@ -19,10 +19,17 @@
import org.apache.flink.annotation.Experimental;
import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
import javax.annotation.Nullable;
import java.time.Duration;
+import java.util.Map;
+
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
/**
* Handler for autoscaler events.
@@ -32,12 +39,17 @@
*/
@Experimental
public interface AutoScalerEventHandler> {
+ String SCALING_SUMMARY_ENTRY =
+ " Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f -> %.2f | Target data rate %.2f";
+ String SCALING_SUMMARY_HEADER_SCALING_DISABLED = "Recommended parallelism change:";
+ String SCALING_SUMMARY_HEADER_SCALING_ENABLED = "Scaling vertices:";
+ String SCALING_REPORT_REASON = "ScalingReport";
+ String SCALING_REPORT_KEY = "ScalingExecutor";
/**
* Handle the event.
*
- * @param interval When interval is great than 0, events that repeat within the interval will be
- * ignored.
+ * @param interval Define the interval to suppress duplicate events. No dedupe if null.
*/
void handleEvent(
Context context,
@@ -47,6 +59,50 @@ void handleEvent(
@Nullable String messageKey,
@Nullable Duration interval);
+ /**
+ * Handle scaling reports.
+ *
+ * @param interval Define the interval to suppress duplicate events.
+ * @param scaled Whether AutoScaler actually scaled the Flink job or just generate advice for
+ * scaling.
+ */
+ default void handleScalingEvent(
+ Context context,
+ Map scalingSummaries,
+ boolean scaled,
+ Duration interval) {
+ // Provide default implementation without proper deduplication
+ var scalingReport = scalingReport(scalingSummaries, scaled);
+ handleEvent(
+ context,
+ Type.Normal,
+ SCALING_REPORT_REASON,
+ scalingReport,
+ SCALING_REPORT_KEY,
+ interval);
+ }
+
+ static String scalingReport(
+ Map scalingSummaries, boolean scalingEnabled) {
+ StringBuilder sb =
+ new StringBuilder(
+ scalingEnabled
+ ? SCALING_SUMMARY_HEADER_SCALING_ENABLED
+ : SCALING_SUMMARY_HEADER_SCALING_DISABLED);
+ scalingSummaries.forEach(
+ (v, s) ->
+ sb.append(
+ String.format(
+ SCALING_SUMMARY_ENTRY,
+ v,
+ s.getCurrentParallelism(),
+ s.getNewParallelism(),
+ s.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(),
+ s.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(),
+ s.getMetrics().get(TARGET_DATA_RATE).getAverage())));
+ return sb.toString();
+ }
+
/** The type of the events. */
enum Type {
Normal,
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
index d70db971dd..1b01087c8a 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
@@ -358,6 +358,39 @@ public void testSendingIneffectiveScalingEvents() {
assertThat(event.getMessage())
.isEqualTo(String.format(INEFFECTIVE_MESSAGE_FORMAT, jobVertexID));
assertThat(event.getReason()).isEqualTo(INEFFECTIVE_SCALING);
+ assertEquals(1, event.getCount());
+
+ // Repeat ineffective scale with default interval, no event is triggered
+ assertEquals(
+ 20,
+ vertexScaler.computeScaleTargetParallelism(
+ context, jobVertexID, evaluated, history));
+ assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
+ assertEquals(0, eventCollector.events.size());
+
+ // Repeat ineffective scale with postive interval, no event is triggered
+ conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ofSeconds(1800));
+ assertEquals(
+ 20,
+ vertexScaler.computeScaleTargetParallelism(
+ context, jobVertexID, evaluated, history));
+ assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
+ assertEquals(0, eventCollector.events.size());
+
+ // Ineffective scale with interval set to 0, an event is triggered
+ conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ZERO);
+ assertEquals(
+ 20,
+ vertexScaler.computeScaleTargetParallelism(
+ context, jobVertexID, evaluated, history));
+ assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
+ assertEquals(1, eventCollector.events.size());
+ event = eventCollector.events.poll();
+ assertThat(event).isNotNull();
+ assertThat(event.getMessage())
+ .isEqualTo(String.format(INEFFECTIVE_MESSAGE_FORMAT, jobVertexID));
+ assertThat(event.getReason()).isEqualTo(INEFFECTIVE_SCALING);
+ assertEquals(2, event.getCount());
}
private Map evaluated(
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
index 36fb6e31db..bae6f36b2a 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
@@ -37,8 +37,11 @@
import java.util.Map;
import java.util.stream.Collectors;
-import static org.apache.flink.autoscaler.ScalingExecutor.SCALING_SUMMARY_ENTRY;
import static org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
+import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_REPORT_REASON;
+import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_SUMMARY_ENTRY;
+import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_DISABLED;
+import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_ENABLED;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -151,19 +154,20 @@ public void testVertexesExclusionForScaling() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
- public void testScalingEventsWith0Interval(boolean scalingEnabled) throws Exception {
+ public void testScalingEventsWith0IntervalConfig(boolean scalingEnabled) throws Exception {
testScalingEvents(scalingEnabled, Duration.ofSeconds(0));
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
- public void testScalingEventsWithInterval(boolean scalingEnabled) throws Exception {
+ public void testScalingEventsWithIntervalConfig(boolean scalingEnabled) throws Exception {
testScalingEvents(scalingEnabled, Duration.ofSeconds(1800));
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
- public void testScalingEventsWithDefaultInterval(boolean scalingEnabled) throws Exception {
+ public void testScalingEventsWithDefaultIntervalConfig(boolean scalingEnabled)
+ throws Exception {
testScalingEvents(scalingEnabled, null);
}
@@ -175,17 +179,13 @@ private void testScalingEvents(boolean scalingEnabled, Duration interval) throws
var metrics = Map.of(jobVertexID, evaluated(1, 110, 100));
if (interval != null) {
- conf.set(AutoScalerOptions.SCALING_REPORT_INTERVAL, interval);
+ conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, interval);
}
assertEquals(scalingEnabled, scalingDecisionExecutor.scaleResource(context, metrics));
assertEquals(scalingEnabled, scalingDecisionExecutor.scaleResource(context, metrics));
- int expectedSize =
- (interval == null || (!interval.isNegative() && !interval.isZero()))
- && !scalingEnabled
- ? 1
- : 2;
+ int expectedSize = (interval == null || interval.toMillis() > 0) && !scalingEnabled ? 1 : 2;
assertEquals(expectedSize, eventCollector.events.size());
TestingEventCollector.Event> event;
@@ -208,9 +208,9 @@ private void testScalingEvents(boolean scalingEnabled, Duration interval) throws
event.getMessage()
.contains(
scalingEnabled
- ? ScalingExecutor.SCALING_SUMMARY_HEADER_SCALING_ENABLED
- : ScalingExecutor.SCALING_SUMMARY_HEADER_SCALING_DISABLED));
- assertEquals(ScalingExecutor.SCALING_REPORT_REASON, event.getReason());
+ ? SCALING_SUMMARY_HEADER_SCALING_ENABLED
+ : SCALING_SUMMARY_HEADER_SCALING_DISABLED));
+ assertEquals(SCALING_REPORT_REASON, event.getReason());
metrics = Map.of(jobVertexID, evaluated(1, 110, 101));
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java
index bd742cde5e..485f9c7e22 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/TestingEventCollector.java
@@ -20,6 +20,7 @@
import org.apache.flink.autoscaler.JobAutoScalerContext;
import lombok.Getter;
+import lombok.Setter;
import javax.annotation.Nullable;
@@ -30,6 +31,8 @@
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
+import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
+
/** Testing {@link AutoScalerEventHandler} implementation. */
public class TestingEventCollector>
implements AutoScalerEventHandler {
@@ -45,17 +48,18 @@ public void handleEvent(
String reason,
String message,
@Nullable String messageKey,
- @Nullable Duration interval) {
+ Duration interval) {
String eventKey =
generateEventKey(context, type, reason, messageKey != null ? messageKey : message);
Event event = eventMap.get(eventKey);
+ var scaled = context.getConfiguration().get(SCALING_ENABLED);
if (event == null) {
- Event newEvent = new Event<>(context, type, reason, message, messageKey);
+ Event newEvent = new Event<>(context, reason, message, messageKey);
events.add(newEvent);
eventMap.put(eventKey, newEvent);
return;
- }
- if (Objects.equals(event.getMessage(), message)
+ } else if (((!scaled && Objects.equals(event.getMessage(), message))
+ || !Objects.equals(reason, SCALING_REPORT_REASON))
&& interval != null
&& Instant.now()
.isBefore(event.getLastUpdateTimestamp().plusMillis(interval.toMillis()))) {
@@ -63,6 +67,8 @@ public void handleEvent(
return;
}
event.incrementCount();
+ event.setMessage(message);
+ event.setLastUpdateTimestamp(Instant.now());
events.add(event);
}
@@ -73,29 +79,21 @@ private String generateEventKey(Context context, Type type, String reason, Strin
/** The collected event. */
public static class Event> {
- @Getter private Instant lastUpdateTimestamp;
+ @Getter @Setter private Instant lastUpdateTimestamp;
@Getter private final Context context;
- @Getter private final Type type;
-
@Getter private final String reason;
- @Getter private final String message;
+ @Getter @Setter private String message;
@Getter @Nullable private final String messageKey;
@Getter private int count;
- public Event(
- Context context,
- Type type,
- String reason,
- String message,
- @Nullable String messageKey) {
+ public Event(Context context, String reason, String message, @Nullable String messageKey) {
this.lastUpdateTimestamp = Instant.now();
this.context = context;
- this.type = type;
this.reason = reason;
this.message = message;
this.messageKey = messageKey;
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java
index 54fe84ae25..d83e2db4d0 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandler.java
@@ -17,19 +17,26 @@
package org.apache.flink.kubernetes.operator.autoscaler;
+import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import javax.annotation.Nullable;
import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
/** An event handler which posts events to the Kubernetes events API. */
public class KubernetesAutoScalerEventHandler
implements AutoScalerEventHandler {
+ public static final String PARALLELISM_MAP_KEY = "parallelismMap";
private final EventRecorder eventRecorder;
public KubernetesAutoScalerEventHandler(EventRecorder eventRecorder) {
@@ -44,25 +51,71 @@ public void handleEvent(
String message,
@Nullable String messageKey,
@Nullable Duration interval) {
- if (interval == null) {
- eventRecorder.triggerEvent(
- context.getResource(),
- EventRecorder.Type.valueOf(type.name()),
- reason,
- message,
- EventRecorder.Component.Operator,
- messageKey,
- context.getKubernetesClient());
+ eventRecorder.triggerEventWithInterval(
+ context.getResource(),
+ EventRecorder.Type.valueOf(type.name()),
+ reason,
+ message,
+ EventRecorder.Component.Operator,
+ messageKey,
+ context.getKubernetesClient(),
+ interval);
+ }
+
+ @Override
+ public void handleScalingEvent(
+ KubernetesJobAutoScalerContext context,
+ Map scalingSummaries,
+ boolean scaled,
+ Duration interval) {
+ if (scaled) {
+ AutoScalerEventHandler.super.handleScalingEvent(
+ context, scalingSummaries, scaled, null);
} else {
- eventRecorder.triggerEventByInterval(
+ var conf = context.getConfiguration();
+ var scalingReport = AutoScalerEventHandler.scalingReport(scalingSummaries, scaled);
+ var labels = Map.of(PARALLELISM_MAP_KEY, getParallelismHashCode(scalingSummaries));
+
+ @Nullable
+ Predicate