From 74f1f7097cf9c68f325a34d6f55f14356780c47b Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Wed, 24 Jul 2024 13:31:49 -0400 Subject: [PATCH] Generic throttling metrics namespace and BigtableIO write throttling counter (#31924) * BigtableIO write throttling counter * Introduce a generic throttling namespace and counter * Dataflow accumulates throttling time from generic throttling counter * Apply throttling counter to BigtableIO write * default to 3 min for throttlingReportTargetMs --- .../worker/BatchModeExecutionContext.java | 26 +++-- .../worker/DataflowSystemMetrics.java | 5 +- .../worker/StreamingDataflowWorker.java | 12 +- .../dataflow/worker/streaming/StageInfo.java | 14 +-- .../worker/BatchModeExecutionContextTest.java | 3 +- .../org/apache/beam/sdk/metrics/Metrics.java | 7 ++ .../beam/sdk/metrics/MetricsEnvironment.java | 11 +- .../org/apache/beam/sdk/util/StringUtils.java | 35 ++++++ .../apache/beam/sdk/util/StringUtilsTest.java | 23 ++++ .../gcp/util/RetryHttpRequestInitializer.java | 2 +- .../io/gcp/bigquery/BigQueryServicesImpl.java | 4 +- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 106 +++++++++++++++++- .../io/gcp/bigtable/BigtableServiceImpl.java | 4 + .../io/gcp/bigtable/BigtableWriteOptions.java | 5 + .../sdk/io/gcp/datastore/DatastoreV1.java | 2 +- .../io/gcp/datastore/RampupThrottlingFn.java | 3 +- .../gcp/bigquery/BigQuerySinkMetricsTest.java | 4 +- .../sdk/io/gcp/bigtable/BigtableIOTest.java | 15 +++ .../beam/sdk/io/synthetic/SyntheticStep.java | 2 +- 19 files changed, 233 insertions(+), 50 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java index 8c038189ae62..41bbae7cfdb3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java @@ -40,6 +40,7 @@ import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.TimeDomain; @@ -68,9 +69,6 @@ public class BatchModeExecutionContext private Object key; private final MetricsContainerRegistry containerRegistry; - - // TODO(https://github.com/apache/beam/issues/19632): Move throttle time Metric to a dedicated - // namespace. protected static final String DATASTORE_THROTTLE_TIME_NAMESPACE = "org.apache.beam.sdk.io.gcp.datastore.DatastoreV1$DatastoreWriterFn"; protected static final String HTTP_CLIENT_API_THROTTLE_TIME_NAMESPACE = @@ -79,7 +77,6 @@ public class BatchModeExecutionContext "org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl"; protected static final String BIGQUERY_READ_THROTTLE_TIME_NAMESPACE = "org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$StorageClientImpl"; - protected static final String THROTTLE_TIME_COUNTER_NAME = "throttling-msecs"; // TODO(BEAM-31814): Remove once Dataflow legacy runner supports this. private final boolean populateStringSetMetrics; @@ -550,11 +547,18 @@ public Iterable extractMsecCounters(boolean isFinalUpdate) { public Long extractThrottleTime() { long totalThrottleMsecs = 0L; for (MetricsContainerImpl container : containerRegistry.getContainers()) { - // TODO(https://github.com/apache/beam/issues/19632): Update throttling counters to use - // generic throttling-msecs metric. + CounterCell userThrottlingTime = + container.tryGetCounter( + MetricName.named( + Metrics.THROTTLE_TIME_NAMESPACE, Metrics.THROTTLE_TIME_COUNTER_NAME)); + if (userThrottlingTime != null) { + totalThrottleMsecs += userThrottlingTime.getCumulative(); + } + CounterCell dataStoreThrottlingTime = container.tryGetCounter( - MetricName.named(DATASTORE_THROTTLE_TIME_NAMESPACE, THROTTLE_TIME_COUNTER_NAME)); + MetricName.named( + DATASTORE_THROTTLE_TIME_NAMESPACE, Metrics.THROTTLE_TIME_COUNTER_NAME)); if (dataStoreThrottlingTime != null) { totalThrottleMsecs += dataStoreThrottlingTime.getCumulative(); } @@ -562,7 +566,7 @@ public Long extractThrottleTime() { CounterCell httpClientApiThrottlingTime = container.tryGetCounter( MetricName.named( - HTTP_CLIENT_API_THROTTLE_TIME_NAMESPACE, THROTTLE_TIME_COUNTER_NAME)); + HTTP_CLIENT_API_THROTTLE_TIME_NAMESPACE, Metrics.THROTTLE_TIME_COUNTER_NAME)); if (httpClientApiThrottlingTime != null) { totalThrottleMsecs += httpClientApiThrottlingTime.getCumulative(); } @@ -570,14 +574,16 @@ public Long extractThrottleTime() { CounterCell bigqueryStreamingInsertThrottleTime = container.tryGetCounter( MetricName.named( - BIGQUERY_STREAMING_INSERT_THROTTLE_TIME_NAMESPACE, THROTTLE_TIME_COUNTER_NAME)); + BIGQUERY_STREAMING_INSERT_THROTTLE_TIME_NAMESPACE, + Metrics.THROTTLE_TIME_COUNTER_NAME)); if (bigqueryStreamingInsertThrottleTime != null) { totalThrottleMsecs += bigqueryStreamingInsertThrottleTime.getCumulative(); } CounterCell bigqueryReadThrottleTime = container.tryGetCounter( - MetricName.named(BIGQUERY_READ_THROTTLE_TIME_NAMESPACE, THROTTLE_TIME_COUNTER_NAME)); + MetricName.named( + BIGQUERY_READ_THROTTLE_TIME_NAMESPACE, Metrics.THROTTLE_TIME_COUNTER_NAME)); if (bigqueryReadThrottleTime != null) { totalThrottleMsecs += bigqueryReadThrottleTime.getCumulative(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java index 640febc616ba..c5a24df192eb 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSystemMetrics.java @@ -20,15 +20,14 @@ import org.apache.beam.runners.dataflow.worker.counters.CounterName; import org.apache.beam.runners.dataflow.worker.counters.NameContext; import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; /** This holds system metrics related constants used in Batch and Streaming. */ public class DataflowSystemMetrics { public static final MetricName THROTTLING_MSECS_METRIC_NAME = - MetricName.named("dataflow-throttling-metrics", "throttling-msecs"); - - // TODO: Provide an utility in SDK 'ThrottlingReporter' to update throttling time. + MetricName.named("dataflow-throttling-metrics", Metrics.THROTTLE_TIME_COUNTER_NAME); /** System counters populated by streaming dataflow workers. */ public enum StreamingSystemCounterNames { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 0e46e7e4687e..718d93830c41 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -93,13 +93,13 @@ import org.apache.beam.sdk.fn.JvmInitializers; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics; -import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.util.construction.CoderTranslation; import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.*; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles; @@ -113,14 +113,6 @@ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) public class StreamingDataflowWorker { - - // TODO(https://github.com/apache/beam/issues/19632): Update throttling counters to use generic - // throttling-msecs metric. - public static final MetricName BIGQUERY_STREAMING_INSERT_THROTTLE_TIME = - MetricName.named( - "org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl", - "throttling-msecs"); - /** * Sinks are marked 'full' in {@link StreamingModeExecutionContext} once the amount of data sinked * (across all the sinks, if there are more than one) reaches this limit. This serves as hint for diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java index 8f14ea26a461..a18ca8cfd6dc 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java @@ -17,7 +17,7 @@ */ package org.apache.beam.runners.dataflow.worker.streaming; -import static org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics.THROTTLING_MSECS_METRIC_NAME; +import static org.apache.beam.sdk.metrics.Metrics.THROTTLE_TIME_COUNTER_NAME; import com.google.api.services.dataflow.model.CounterStructuredName; import com.google.api.services.dataflow.model.CounterUpdate; @@ -28,7 +28,6 @@ import java.util.List; import org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics; import org.apache.beam.runners.dataflow.worker.MetricsContainerRegistry; -import org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker; import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.StreamingModeExecutionStateRegistry; import org.apache.beam.runners.dataflow.worker.StreamingStepMetricsContainer; import org.apache.beam.runners.dataflow.worker.counters.Counter; @@ -93,20 +92,13 @@ public List extractCounterUpdates() { } /** - * Checks if the step counter affects any per-stage counters. Currently 'throttled_millis' is the + * Checks if the step counter affects any per-stage counters. Currently 'throttled-msecs' is the * only counter updated. */ private void translateKnownStepCounters(CounterUpdate stepCounterUpdate) { CounterStructuredName structuredName = stepCounterUpdate.getStructuredNameAndMetadata().getName(); - if ((THROTTLING_MSECS_METRIC_NAME.getNamespace().equals(structuredName.getOriginNamespace()) - && THROTTLING_MSECS_METRIC_NAME.getName().equals(structuredName.getName())) - || (StreamingDataflowWorker.BIGQUERY_STREAMING_INSERT_THROTTLE_TIME - .getNamespace() - .equals(structuredName.getOriginNamespace()) - && StreamingDataflowWorker.BIGQUERY_STREAMING_INSERT_THROTTLE_TIME - .getName() - .equals(structuredName.getName()))) { + if (THROTTLE_TIME_COUNTER_NAME.equals(structuredName.getName())) { long msecs = DataflowCounterUpdateExtractor.splitIntToLong(stepCounterUpdate.getInteger()); if (msecs > 0) { throttledMsecs().addValue(msecs); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java index 18bd814b4df7..4062fbf6ebed 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java @@ -43,6 +43,7 @@ import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.sdk.metrics.StringSet; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -266,7 +267,7 @@ public void extractThrottleTimeCounters() { .getCounter( MetricName.named( BatchModeExecutionContext.DATASTORE_THROTTLE_TIME_NAMESPACE, - BatchModeExecutionContext.THROTTLE_TIME_COUNTER_NAME)); + Metrics.THROTTLE_TIME_COUNTER_NAME)); counter.inc(12000); counter.inc(17000); counter.inc(1000); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java index 916e18647c34..dc80a66c0550 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java @@ -109,6 +109,13 @@ public static StringSet stringSet(Class namespace, String name) { return new DelegatingStringSet(MetricName.named(namespace, name)); } + /* + * A dedicated namespace for client throttling time. User DoFn can increment this metrics and then + * runner will put back pressure on scaling decision, if supported. + */ + public static final String THROTTLE_TIME_NAMESPACE = "beam-throttling-metrics"; + public static final String THROTTLE_TIME_COUNTER_NAME = "throttling-msecs"; + /** * Implementation of {@link Distribution} that delegates to the instance for the current context. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java index 7f8f2a436433..3421bb4afc85 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.util.StringUtils; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; @@ -134,10 +135,14 @@ public void close() throws IOException { if (container == null && REPORTED_MISSING_CONTAINER.compareAndSet(false, true)) { if (isMetricsSupported()) { LOG.error( - "Unable to update metrics on the current thread. " - + "Most likely caused by using metrics outside the managed work-execution thread."); + "Unable to update metrics on the current thread. Most likely caused by using metrics " + + "outside the managed work-execution thread:\n {}", + StringUtils.arrayToNewlines(Thread.currentThread().getStackTrace(), 10)); } else { - LOG.warn("Reporting metrics are not supported in the current execution environment."); + // rate limiting this log as it can be emitted each time metrics incremented + LOG.warn( + "Reporting metrics are not supported in the current execution environment:\n {}", + StringUtils.arrayToNewlines(Thread.currentThread().getStackTrace(), 10)); } } return container; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java index 13105fb6c02c..ccd58857da04 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.beam.sdk.annotations.Internal; +import org.checkerframework.checker.nullness.qual.Nullable; /** Utilities for working with JSON and other human-readable string formats. */ @Internal @@ -143,4 +144,38 @@ public static int getLevenshteinDistance(final String s, final String t) { return v1[t.length()]; } + + /** + * Convert Array to new lined String. Truncate to first {@code maxLine} elements. + * + *

Useful to truncate stacktrace and for logging. + */ + public static String arrayToNewlines(Object[] array, int maxLine) { + int n = (maxLine > 0 && array.length > maxLine) ? maxLine : array.length; + StringBuilder b = new StringBuilder(); + for (int i = 0; i < n; i++) { + b.append(array[i]); + b.append("\n"); + } + if (array.length > maxLine) { + b.append("...\n"); + } + return b.toString(); + } + + /** + * Truncate String if length greater than maxLen, and append "..." to the end. Handles null. + * + *

Useful to truncate long logging message. + */ + public static String leftTruncate(@Nullable Object element, int maxLen) { + if (element == null) { + return ""; + } + String s = element.toString(); + if (s.length() > maxLen) { + return s.substring(0, maxLen) + "..."; + } + return s; + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java index 9e9686ca2011..e8b0e7ecd470 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java @@ -17,9 +17,13 @@ */ package org.apache.beam.sdk.util; +import static org.apache.commons.lang3.StringUtils.countMatches; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import java.util.UUID; +import java.util.stream.IntStream; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -54,4 +58,23 @@ public void testLevenshteinDistance() { assertEquals(1, StringUtils.getLevenshteinDistance("abc", "ab1c")); // insertion assertEquals(1, StringUtils.getLevenshteinDistance("abc", "a1c")); // modification } + + @Test + public void testArrayToNewlines() { + Object[] uuids = IntStream.range(1, 10).mapToObj(unused -> UUID.randomUUID()).toArray(); + + String r1 = StringUtils.arrayToNewlines(uuids, 6); + assertTrue(r1.endsWith("...\n")); + assertEquals(7, countMatches(r1, "\n")); + String r2 = StringUtils.arrayToNewlines(uuids, 15); + String r3 = StringUtils.arrayToNewlines(uuids, 10); + assertEquals(r3, r2); + } + + @Test + public void testLeftTruncate() { + assertEquals("", StringUtils.leftTruncate(null, 3)); + assertEquals("", StringUtils.leftTruncate("", 3)); + assertEquals("abc...", StringUtils.leftTruncate("abcd", 3)); + } } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializer.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializer.java index d053a5f4bf80..b48dc6368050 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializer.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/RetryHttpRequestInitializer.java @@ -75,7 +75,7 @@ private static class LoggingHttpBackOffHandler private final Set ignoredResponseCodes; // aggregate the total time spent in exponential backoff private final Counter throttlingMsecs = - Metrics.counter(LoggingHttpBackOffHandler.class, "throttling-msecs"); + Metrics.counter(LoggingHttpBackOffHandler.class, Metrics.THROTTLE_TIME_COUNTER_NAME); private int ioExceptionRetries; private int unsuccessfulResponseRetries; private @Nullable CustomHttpErrors customHttpErrors; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index c6b0e17e59db..b87b6a222a4d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -575,7 +575,7 @@ public static class DatasetServiceImpl implements DatasetService { private final long maxRowBatchSize; // aggregate the total time spent in exponential backoff private final Counter throttlingMsecs = - Metrics.counter(DatasetServiceImpl.class, "throttling-msecs"); + Metrics.counter(DatasetServiceImpl.class, Metrics.THROTTLE_TIME_COUNTER_NAME); private @Nullable BoundedExecutorService executor; @@ -1663,7 +1663,7 @@ public void cancel() { static class StorageClientImpl implements StorageClient { public static final Counter THROTTLING_MSECS = - Metrics.counter(StorageClientImpl.class, "throttling-msecs"); + Metrics.counter(StorageClientImpl.class, Metrics.THROTTLE_TIME_COUNTER_NAME); private transient long unreportedDelay = 0L; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index d25ad7d4871d..d78ae2cb6c57 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -21,13 +21,16 @@ import static org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG; import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; import com.google.api.gax.batching.BatchingException; import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.DeadlineExceededException; import com.google.api.gax.rpc.InvalidArgumentException; import com.google.api.gax.rpc.NotFoundException; +import com.google.api.gax.rpc.ResourceExhaustedException; import com.google.auto.value.AutoValue; import com.google.bigtable.v2.MutateRowResponse; import com.google.bigtable.v2.Mutation; @@ -38,6 +41,7 @@ import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord; import com.google.cloud.bigtable.data.v2.models.KeyOffset; import com.google.protobuf.ByteString; +import io.grpc.StatusRuntimeException; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -69,6 +73,8 @@ import org.apache.beam.sdk.io.range.ByteKey; import org.apache.beam.sdk.io.range.ByteKeyRange; import org.apache.beam.sdk.io.range.ByteKeyRangeTracker; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; @@ -82,6 +88,7 @@ import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter; import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.StringUtils; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -1109,12 +1116,51 @@ public Write withMaxOutstandingBytes(long bytes) { * always enabled on batch writes and limits the number of outstanding requests to the Bigtable * server. * + *

When enabled, will also set default {@link #withThrottlingReportTargetMs} to 1 minute. + * This enables runner react with increased latency in flush call due to flow control. + * *

Does not modify this object. */ public Write withFlowControl(boolean enableFlowControl) { + BigtableWriteOptions options = getBigtableWriteOptions(); + BigtableWriteOptions.Builder builder = options.toBuilder().setFlowControl(enableFlowControl); + if (enableFlowControl) { + builder = builder.setThrottlingReportTargetMs(60_000); + } + return toBuilder().setBigtableWriteOptions(builder.build()).build(); + } + + /** + * Returns a new {@link BigtableIO.Write} with client side latency based throttling enabled. + * + *

Will also set {@link #withThrottlingReportTargetMs} to the same value. + */ + public Write withThrottlingTargetMs(int throttlingTargetMs) { + BigtableWriteOptions options = getBigtableWriteOptions(); + return toBuilder() + .setBigtableWriteOptions( + options + .toBuilder() + .setThrottlingTargetMs(throttlingTargetMs) + .setThrottlingReportTargetMs(throttlingTargetMs) + .build()) + .build(); + } + + /** + * Returns a new {@link BigtableIO.Write} with throttling time reporting enabled. When write + * request latency exceeded the set value, the amount greater than the target will be considered + * as throttling time and report back to runner. + * + *

If not set, defaults to 3 min for completed batch request. Client side flowing control + * configurations (e.g. {@link #withFlowControl}, {@link #withThrottlingTargetMs} will adjust + * the default value accordingly. Set to 0 to disable throttling time reporting. + */ + public Write withThrottlingReportTargetMs(int throttlingReportTargetMs) { BigtableWriteOptions options = getBigtableWriteOptions(); return toBuilder() - .setBigtableWriteOptions(options.toBuilder().setFlowControl(enableFlowControl).build()) + .setBigtableWriteOptions( + options.toBuilder().setThrottlingReportTargetMs(throttlingReportTargetMs).build()) .build(); } @@ -1283,7 +1329,14 @@ private static class BigtableWriterFn private final Coder>> inputCoder; private final BadRecordRouter badRecordRouter; + private final Counter throttlingMsecs = + Metrics.counter(Metrics.THROTTLE_TIME_NAMESPACE, Metrics.THROTTLE_TIME_COUNTER_NAME); + + private final int throttleReportThresMsecs; + private transient Set> badRecords = null; + // Due to callback thread not supporting Beam metrics, Record pending metrics and report later. + private transient long pendingThrottlingMsecs; // Assign serviceEntry in startBundle and clear it in tearDown. @Nullable private BigtableServiceEntry serviceEntry; @@ -1301,6 +1354,8 @@ private static class BigtableWriterFn this.badRecordRouter = badRecordRouter; this.failures = new ConcurrentLinkedQueue<>(); this.id = factory.newId(); + // a request completed more than this time will be considered throttled. Disabled if set to 0 + throttleReportThresMsecs = firstNonNull(writeOptions.getThrottlingReportTargetMs(), 180_000); LOG.debug("Created Bigtable Write Fn with writeOptions {} ", writeOptions); } @@ -1322,20 +1377,52 @@ public void startBundle(StartBundleContext c) throws IOException { public void processElement(ProcessContext c, BoundedWindow window) throws Exception { checkForFailures(); KV> record = c.element(); - bigtableWriter.writeRecord(record).whenComplete(handleMutationException(record, window)); + Instant writeStart = Instant.now(); + pendingThrottlingMsecs = 0; + bigtableWriter + .writeRecord(record) + .whenComplete(handleMutationException(record, window, writeStart)); + if (pendingThrottlingMsecs > 0) { + throttlingMsecs.inc(pendingThrottlingMsecs); + } ++recordsWritten; seenWindows.compute(window, (key, count) -> (count != null ? count : 0) + 1); } private BiConsumer handleMutationException( - KV> record, BoundedWindow window) { + KV> record, BoundedWindow window, Instant writeStart) { return (MutateRowResponse result, Throwable exception) -> { if (exception != null) { if (isDataException(exception)) { retryIndividualRecord(record, window); } else { + // Exception due to resource unavailable or rate limited, + // including DEADLINE_EXCEEDED and RESOURCE_EXHAUSTED. + boolean isResourceException = false; + if (exception instanceof StatusRuntimeException) { + StatusRuntimeException se = (StatusRuntimeException) exception; + if (io.grpc.Status.DEADLINE_EXCEEDED.equals(se.getStatus()) + || io.grpc.Status.RESOURCE_EXHAUSTED.equals(se.getStatus())) { + isResourceException = true; + } + } else if (exception instanceof DeadlineExceededException + || exception instanceof ResourceExhaustedException) { + isResourceException = true; + } + if (isResourceException) { + pendingThrottlingMsecs = new Duration(writeStart, Instant.now()).getMillis(); + } failures.add(new BigtableWriteException(record, exception)); } + } else { + // add the excessive amount to throttling metrics if elapsed time > target latency + if (throttleReportThresMsecs > 0) { + long excessTime = + new Duration(writeStart, Instant.now()).getMillis() - throttleReportThresMsecs; + if (excessTime > 0) { + pendingThrottlingMsecs = excessTime; + } + } } }; } @@ -1371,8 +1458,8 @@ private static boolean isDataException(Throwable e) { @FinishBundle public void finishBundle(FinishBundleContext c) throws Exception { try { - if (bigtableWriter != null) { + Instant closeStart = Instant.now(); try { bigtableWriter.close(); } catch (IOException e) { @@ -1381,9 +1468,18 @@ public void finishBundle(FinishBundleContext c) throws Exception { // to the error queue. Bigtable will successfully write other failures in the batch, // so this exception should be ignored if (!(e.getCause() instanceof BatchingException)) { + throttlingMsecs.inc(new Duration(closeStart, Instant.now()).getMillis()); throw e; } } + // add the excessive amount to throttling metrics if elapsed time > target latency + if (throttleReportThresMsecs > 0) { + long excessTime = + new Duration(closeStart, Instant.now()).getMillis() - throttleReportThresMsecs; + if (excessTime > 0) { + throttlingMsecs.inc(excessTime); + } + } bigtableWriter = null; } @@ -2015,7 +2111,7 @@ public BigtableWriteException(KV> record, Throwab super( String.format( "Error mutating row %s with mutations %s", - record.getKey().toStringUtf8(), record.getValue()), + record.getKey().toStringUtf8(), StringUtils.leftTruncate(record.getValue(), 100)), cause); this.record = record; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java index 06e0108259d5..10cfa724c2ad 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java @@ -24,6 +24,7 @@ import com.google.api.gax.batching.BatchingException; import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.DeadlineExceededException; import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.ServerStream; import com.google.api.gax.rpc.StreamController; @@ -611,6 +612,9 @@ public void onFailure(Throwable throwable) { if (throwable instanceof StatusRuntimeException) { serviceCallMetric.call( ((StatusRuntimeException) throwable).getStatus().getCode().value()); + } else if (throwable instanceof DeadlineExceededException) { + // incoming throwable can be a StatusRuntimeException or a specific grpc ApiException + serviceCallMetric.call(504); } else { serviceCallMetric.call("unknown"); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java index a63cc575809b..5963eb6be3ce 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteOptions.java @@ -57,6 +57,9 @@ abstract class BigtableWriteOptions implements Serializable { /** Returns the target latency if latency based throttling is enabled. */ abstract @Nullable Integer getThrottlingTargetMs(); + /** Returns the target latency if latency based throttling report to runner is enabled. */ + abstract @Nullable Integer getThrottlingReportTargetMs(); + /** Returns true if batch write flow control is enabled. Otherwise return false. */ abstract @Nullable Boolean getFlowControl(); @@ -88,6 +91,8 @@ abstract static class Builder { abstract Builder setThrottlingTargetMs(int targetMs); + abstract Builder setThrottlingReportTargetMs(int targetMs); + abstract Builder setFlowControl(boolean enableFlowControl); abstract Builder setCloseWaitTimeout(Duration timeout); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java index 86cd7a3439aa..1563b0b059f2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java @@ -1711,7 +1711,7 @@ static class DatastoreWriterFn extends DoFn { private WriteBatcher writeBatcher; private transient AdaptiveThrottler adaptiveThrottler; private final Counter throttlingMsecs = - Metrics.counter(DatastoreWriterFn.class, "throttling-msecs"); + Metrics.counter(DatastoreWriterFn.class, Metrics.THROTTLE_TIME_COUNTER_NAME); private final Counter rpcErrors = Metrics.counter(DatastoreWriterFn.class, "datastoreRpcErrors"); private final Counter rpcSuccesses = diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/RampupThrottlingFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/RampupThrottlingFn.java index db098c0a5166..ae94d4b612d0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/RampupThrottlingFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/RampupThrottlingFn.java @@ -53,7 +53,8 @@ public class RampupThrottlingFn extends DoFn implements Serializable { private final PCollectionView firstInstantSideInput; @VisibleForTesting - Counter throttlingMsecs = Metrics.counter(RampupThrottlingFn.class, "throttling-msecs"); + Counter throttlingMsecs = + Metrics.counter(RampupThrottlingFn.class, Metrics.THROTTLE_TIME_COUNTER_NAME); // Initialized on every setup. private transient MovingFunction successfulOps; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java index 50660326275c..8695a445c118 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetricsTest.java @@ -37,6 +37,7 @@ import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Histogram; import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.util.HistogramData; import org.apache.beam.sdk.values.KV; @@ -178,7 +179,8 @@ public void testThrottledTimeCounter() throws Exception { testContainer.assertPerWorkerCounterValue(counterName, 1L); counterName = - MetricName.named(BigQueryServicesImpl.StorageClientImpl.class, "throttling-msecs"); + MetricName.named( + BigQueryServicesImpl.StorageClientImpl.class, Metrics.THROTTLE_TIME_COUNTER_NAME); assertEquals(1L, (long) testContainer.getCounter(counterName).getCumulative()); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index dd6a55ff4378..e5049b037010 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -436,6 +436,21 @@ public void testWriteValidationFailsMissingOptionsAndInstanceAndProject() { write.expand(null); } + @Test + public void testWriteClientRateLimitingAlsoSetReportMsecs() { + // client side flow control + BigtableIO.Write write = BigtableIO.write().withTableId("table").withFlowControl(true); + assertEquals( + 60_000, (int) checkNotNull(write.getBigtableWriteOptions().getThrottlingReportTargetMs())); + + // client side latency based throttling + int targetMs = 30_000; + write = BigtableIO.write().withTableId("table").withThrottlingTargetMs(targetMs); + assertEquals( + targetMs, + (int) checkNotNull(write.getBigtableWriteOptions().getThrottlingReportTargetMs())); + } + /** Helper function to make a single row mutation to be written. */ private static KV> makeWrite(String key, String value) { ByteString rowKey = ByteString.copyFromUtf8(key); diff --git a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticStep.java b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticStep.java index d32640ffbf7d..98db23c95a38 100644 --- a/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticStep.java +++ b/sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticStep.java @@ -58,7 +58,7 @@ public class SyntheticStep extends DoFn, KV> private final KV idAndThroughput; private final Counter throttlingCounter = - Metrics.counter("dataflow-throttling-metrics", "throttling-msecs"); + Metrics.counter("dataflow-throttling-metrics", Metrics.THROTTLE_TIME_COUNTER_NAME); /** * Static cache to store one worker level rate limiter for a step. Value in KV is the desired