Skip to content

Commit

Permalink
Generic throttling metrics namespace and BigtableIO write throttling …
Browse files Browse the repository at this point in the history
…counter (#31924)

* BigtableIO write throttling counter

* Introduce a generic throttling namespace and counter

* Dataflow accumulates throttling time from generic throttling counter

* Apply throttling counter to BigtableIO write

* default to 3 min for throttlingReportTargetMs
  • Loading branch information
Abacn authored Jul 24, 2024
1 parent 799405c commit 74f1f70
Show file tree
Hide file tree
Showing 19 changed files with 233 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.TimeDomain;
Expand Down Expand Up @@ -68,9 +69,6 @@ public class BatchModeExecutionContext
private Object key;

private final MetricsContainerRegistry<MetricsContainerImpl> containerRegistry;

// TODO(https://github.com/apache/beam/issues/19632): Move throttle time Metric to a dedicated
// namespace.
protected static final String DATASTORE_THROTTLE_TIME_NAMESPACE =
"org.apache.beam.sdk.io.gcp.datastore.DatastoreV1$DatastoreWriterFn";
protected static final String HTTP_CLIENT_API_THROTTLE_TIME_NAMESPACE =
Expand All @@ -79,7 +77,6 @@ public class BatchModeExecutionContext
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl";
protected static final String BIGQUERY_READ_THROTTLE_TIME_NAMESPACE =
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$StorageClientImpl";
protected static final String THROTTLE_TIME_COUNTER_NAME = "throttling-msecs";

// TODO(BEAM-31814): Remove once Dataflow legacy runner supports this.
private final boolean populateStringSetMetrics;
Expand Down Expand Up @@ -550,34 +547,43 @@ public Iterable<CounterUpdate> extractMsecCounters(boolean isFinalUpdate) {
public Long extractThrottleTime() {
long totalThrottleMsecs = 0L;
for (MetricsContainerImpl container : containerRegistry.getContainers()) {
// TODO(https://github.com/apache/beam/issues/19632): Update throttling counters to use
// generic throttling-msecs metric.
CounterCell userThrottlingTime =
container.tryGetCounter(
MetricName.named(
Metrics.THROTTLE_TIME_NAMESPACE, Metrics.THROTTLE_TIME_COUNTER_NAME));
if (userThrottlingTime != null) {
totalThrottleMsecs += userThrottlingTime.getCumulative();
}

CounterCell dataStoreThrottlingTime =
container.tryGetCounter(
MetricName.named(DATASTORE_THROTTLE_TIME_NAMESPACE, THROTTLE_TIME_COUNTER_NAME));
MetricName.named(
DATASTORE_THROTTLE_TIME_NAMESPACE, Metrics.THROTTLE_TIME_COUNTER_NAME));
if (dataStoreThrottlingTime != null) {
totalThrottleMsecs += dataStoreThrottlingTime.getCumulative();
}

CounterCell httpClientApiThrottlingTime =
container.tryGetCounter(
MetricName.named(
HTTP_CLIENT_API_THROTTLE_TIME_NAMESPACE, THROTTLE_TIME_COUNTER_NAME));
HTTP_CLIENT_API_THROTTLE_TIME_NAMESPACE, Metrics.THROTTLE_TIME_COUNTER_NAME));
if (httpClientApiThrottlingTime != null) {
totalThrottleMsecs += httpClientApiThrottlingTime.getCumulative();
}

CounterCell bigqueryStreamingInsertThrottleTime =
container.tryGetCounter(
MetricName.named(
BIGQUERY_STREAMING_INSERT_THROTTLE_TIME_NAMESPACE, THROTTLE_TIME_COUNTER_NAME));
BIGQUERY_STREAMING_INSERT_THROTTLE_TIME_NAMESPACE,
Metrics.THROTTLE_TIME_COUNTER_NAME));
if (bigqueryStreamingInsertThrottleTime != null) {
totalThrottleMsecs += bigqueryStreamingInsertThrottleTime.getCumulative();
}

CounterCell bigqueryReadThrottleTime =
container.tryGetCounter(
MetricName.named(BIGQUERY_READ_THROTTLE_TIME_NAMESPACE, THROTTLE_TIME_COUNTER_NAME));
MetricName.named(
BIGQUERY_READ_THROTTLE_TIME_NAMESPACE, Metrics.THROTTLE_TIME_COUNTER_NAME));
if (bigqueryReadThrottleTime != null) {
totalThrottleMsecs += bigqueryReadThrottleTime.getCumulative();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@
import org.apache.beam.runners.dataflow.worker.counters.CounterName;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;

/** This holds system metrics related constants used in Batch and Streaming. */
public class DataflowSystemMetrics {

public static final MetricName THROTTLING_MSECS_METRIC_NAME =
MetricName.named("dataflow-throttling-metrics", "throttling-msecs");

// TODO: Provide an utility in SDK 'ThrottlingReporter' to update throttling time.
MetricName.named("dataflow-throttling-metrics", Metrics.THROTTLE_TIME_COUNTER_NAME);

/** System counters populated by streaming dataflow workers. */
public enum StreamingSystemCounterNames {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,13 @@
import org.apache.beam.sdk.fn.JvmInitializers;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.util.construction.CoderTranslation;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.*;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
Expand All @@ -113,14 +113,6 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class StreamingDataflowWorker {

// TODO(https://github.com/apache/beam/issues/19632): Update throttling counters to use generic
// throttling-msecs metric.
public static final MetricName BIGQUERY_STREAMING_INSERT_THROTTLE_TIME =
MetricName.named(
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl",
"throttling-msecs");

/**
* Sinks are marked 'full' in {@link StreamingModeExecutionContext} once the amount of data sinked
* (across all the sinks, if there are more than one) reaches this limit. This serves as hint for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.dataflow.worker.streaming;

import static org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics.THROTTLING_MSECS_METRIC_NAME;
import static org.apache.beam.sdk.metrics.Metrics.THROTTLE_TIME_COUNTER_NAME;

import com.google.api.services.dataflow.model.CounterStructuredName;
import com.google.api.services.dataflow.model.CounterUpdate;
Expand All @@ -28,7 +28,6 @@
import java.util.List;
import org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics;
import org.apache.beam.runners.dataflow.worker.MetricsContainerRegistry;
import org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker;
import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.StreamingModeExecutionStateRegistry;
import org.apache.beam.runners.dataflow.worker.StreamingStepMetricsContainer;
import org.apache.beam.runners.dataflow.worker.counters.Counter;
Expand Down Expand Up @@ -93,20 +92,13 @@ public List<CounterUpdate> extractCounterUpdates() {
}

/**
* Checks if the step counter affects any per-stage counters. Currently 'throttled_millis' is the
* Checks if the step counter affects any per-stage counters. Currently 'throttled-msecs' is the
* only counter updated.
*/
private void translateKnownStepCounters(CounterUpdate stepCounterUpdate) {
CounterStructuredName structuredName =
stepCounterUpdate.getStructuredNameAndMetadata().getName();
if ((THROTTLING_MSECS_METRIC_NAME.getNamespace().equals(structuredName.getOriginNamespace())
&& THROTTLING_MSECS_METRIC_NAME.getName().equals(structuredName.getName()))
|| (StreamingDataflowWorker.BIGQUERY_STREAMING_INSERT_THROTTLE_TIME
.getNamespace()
.equals(structuredName.getOriginNamespace())
&& StreamingDataflowWorker.BIGQUERY_STREAMING_INSERT_THROTTLE_TIME
.getName()
.equals(structuredName.getName()))) {
if (THROTTLE_TIME_COUNTER_NAME.equals(structuredName.getName())) {
long msecs = DataflowCounterUpdateExtractor.splitIntToLong(stepCounterUpdate.getInteger());
if (msecs > 0) {
throttledMsecs().addValue(msecs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.StringSet;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
Expand Down Expand Up @@ -266,7 +267,7 @@ public void extractThrottleTimeCounters() {
.getCounter(
MetricName.named(
BatchModeExecutionContext.DATASTORE_THROTTLE_TIME_NAMESPACE,
BatchModeExecutionContext.THROTTLE_TIME_COUNTER_NAME));
Metrics.THROTTLE_TIME_COUNTER_NAME));
counter.inc(12000);
counter.inc(17000);
counter.inc(1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ public static StringSet stringSet(Class<?> namespace, String name) {
return new DelegatingStringSet(MetricName.named(namespace, name));
}

/*
* A dedicated namespace for client throttling time. User DoFn can increment this metrics and then
* runner will put back pressure on scaling decision, if supported.
*/
public static final String THROTTLE_TIME_NAMESPACE = "beam-throttling-metrics";
public static final String THROTTLE_TIME_COUNTER_NAME = "throttling-msecs";

/**
* Implementation of {@link Distribution} that delegates to the instance for the current context.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.util.StringUtils;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -134,10 +135,14 @@ public void close() throws IOException {
if (container == null && REPORTED_MISSING_CONTAINER.compareAndSet(false, true)) {
if (isMetricsSupported()) {
LOG.error(
"Unable to update metrics on the current thread. "
+ "Most likely caused by using metrics outside the managed work-execution thread.");
"Unable to update metrics on the current thread. Most likely caused by using metrics "
+ "outside the managed work-execution thread:\n {}",
StringUtils.arrayToNewlines(Thread.currentThread().getStackTrace(), 10));
} else {
LOG.warn("Reporting metrics are not supported in the current execution environment.");
// rate limiting this log as it can be emitted each time metrics incremented
LOG.warn(
"Reporting metrics are not supported in the current execution environment:\n {}",
StringUtils.arrayToNewlines(Thread.currentThread().getStackTrace(), 10));
}
}
return container;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.annotations.Internal;
import org.checkerframework.checker.nullness.qual.Nullable;

/** Utilities for working with JSON and other human-readable string formats. */
@Internal
Expand Down Expand Up @@ -143,4 +144,38 @@ public static int getLevenshteinDistance(final String s, final String t) {

return v1[t.length()];
}

/**
* Convert Array to new lined String. Truncate to first {@code maxLine} elements.
*
* <p>Useful to truncate stacktrace and for logging.
*/
public static String arrayToNewlines(Object[] array, int maxLine) {
int n = (maxLine > 0 && array.length > maxLine) ? maxLine : array.length;
StringBuilder b = new StringBuilder();
for (int i = 0; i < n; i++) {
b.append(array[i]);
b.append("\n");
}
if (array.length > maxLine) {
b.append("...\n");
}
return b.toString();
}

/**
* Truncate String if length greater than maxLen, and append "..." to the end. Handles null.
*
* <p>Useful to truncate long logging message.
*/
public static String leftTruncate(@Nullable Object element, int maxLen) {
if (element == null) {
return "";
}
String s = element.toString();
if (s.length() > maxLen) {
return s.substring(0, maxLen) + "...";
}
return s;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@
*/
package org.apache.beam.sdk.util;

import static org.apache.commons.lang3.StringUtils.countMatches;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.util.UUID;
import java.util.stream.IntStream;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -54,4 +58,23 @@ public void testLevenshteinDistance() {
assertEquals(1, StringUtils.getLevenshteinDistance("abc", "ab1c")); // insertion
assertEquals(1, StringUtils.getLevenshteinDistance("abc", "a1c")); // modification
}

@Test
public void testArrayToNewlines() {
Object[] uuids = IntStream.range(1, 10).mapToObj(unused -> UUID.randomUUID()).toArray();

String r1 = StringUtils.arrayToNewlines(uuids, 6);
assertTrue(r1.endsWith("...\n"));
assertEquals(7, countMatches(r1, "\n"));
String r2 = StringUtils.arrayToNewlines(uuids, 15);
String r3 = StringUtils.arrayToNewlines(uuids, 10);
assertEquals(r3, r2);
}

@Test
public void testLeftTruncate() {
assertEquals("", StringUtils.leftTruncate(null, 3));
assertEquals("", StringUtils.leftTruncate("", 3));
assertEquals("abc...", StringUtils.leftTruncate("abcd", 3));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ private static class LoggingHttpBackOffHandler
private final Set<Integer> ignoredResponseCodes;
// aggregate the total time spent in exponential backoff
private final Counter throttlingMsecs =
Metrics.counter(LoggingHttpBackOffHandler.class, "throttling-msecs");
Metrics.counter(LoggingHttpBackOffHandler.class, Metrics.THROTTLE_TIME_COUNTER_NAME);
private int ioExceptionRetries;
private int unsuccessfulResponseRetries;
private @Nullable CustomHttpErrors customHttpErrors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ public static class DatasetServiceImpl implements DatasetService {
private final long maxRowBatchSize;
// aggregate the total time spent in exponential backoff
private final Counter throttlingMsecs =
Metrics.counter(DatasetServiceImpl.class, "throttling-msecs");
Metrics.counter(DatasetServiceImpl.class, Metrics.THROTTLE_TIME_COUNTER_NAME);

private @Nullable BoundedExecutorService executor;

Expand Down Expand Up @@ -1663,7 +1663,7 @@ public void cancel() {
static class StorageClientImpl implements StorageClient {

public static final Counter THROTTLING_MSECS =
Metrics.counter(StorageClientImpl.class, "throttling-msecs");
Metrics.counter(StorageClientImpl.class, Metrics.THROTTLE_TIME_COUNTER_NAME);

private transient long unreportedDelay = 0L;

Expand Down
Loading

0 comments on commit 74f1f70

Please sign in to comment.