Skip to content

Commit

Permalink
Enable BigQueryIO write throttling detection (#31253)
Browse files Browse the repository at this point in the history
* BigQueryIO write throttling detection

* double retry interval
  • Loading branch information
Abacn authored May 21, 2024
1 parent f4119f5 commit 675dab2
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1653,7 +1653,7 @@ public void cancel() {

static class StorageClientImpl implements StorageClient {

public final Counter throttlingMsecs =
public static final Counter THROTTLING_MSECS =
Metrics.counter(StorageClientImpl.class, "throttling-msecs");

private transient long unreportedDelay = 0L;
Expand All @@ -1668,7 +1668,7 @@ public void reportPendingMetrics() {
unreportedDelay = 0L;

if (delay > 0) {
throttlingMsecs.inc(delay);
THROTTLING_MSECS.inc(delay);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.time.Instant;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.NestedCounter;
import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.Operation.Context;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.DelegatingCounter;
Expand Down Expand Up @@ -158,6 +159,7 @@ public static Counter appendRowsRowStatusCounter(
}

MetricName metricName = nameBuilder.build(METRICS_NAMESPACE);

return new DelegatingCounter(metricName, false, true);
}

Expand All @@ -166,12 +168,20 @@ public static Counter appendRowsRowStatusCounter(
* @return Counter that tracks throttled time due to RPC retries.
*/
public static Counter throttledTimeCounter(RpcMethod method) {

LabeledMetricNameUtils.MetricNameBuilder nameBuilder =
LabeledMetricNameUtils.MetricNameBuilder.baseNameBuilder(THROTTLED_TIME);
nameBuilder.addLabel(RPC_METHOD, method.toString());
MetricName metricName = nameBuilder.build(METRICS_NAMESPACE);

return new DelegatingCounter(metricName, false, true);
// for specific method
Counter fineCounter = new DelegatingCounter(metricName, false, true);
// for overall throttling time, used by runner for scaling decision
Counter coarseCounter = BigQueryServicesImpl.StorageClientImpl.THROTTLING_MSECS;
return new NestedCounter(
MetricName.named(
METRICS_NAMESPACE, metricName.getName() + coarseCounter.getName().getName()),
fineCounter,
coarseCounter);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
Expand All @@ -65,6 +67,7 @@
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
Expand Down Expand Up @@ -1081,4 +1084,52 @@ private static Object convertAvroNumeric(Object value) {
public static ServiceCallMetric writeCallMetric(TableReference tableReference) {
return callMetricForMethod(tableReference, "BigQueryBatchWrite");
}

/**
* A counter holding a list of counters. Increment the counter will increment every sub-counter it
* holds.
*/
static class NestedCounter implements Counter, Serializable {

private final MetricName name;
private final ImmutableList<Counter> counters;

public NestedCounter(MetricName name, Counter... counters) {
this.name = name;
this.counters = ImmutableList.copyOf(counters);
}

@Override
public void inc() {
for (Counter counter : counters) {
counter.inc();
}
}

@Override
public void inc(long n) {
for (Counter counter : counters) {
counter.inc(n);
}
}

@Override
public void dec() {
for (Counter counter : counters) {
counter.dec();
}
}

@Override
public void dec(long n) {
for (Counter counter : counters) {
counter.dec(n);
}
}

@Override
public MetricName getName() {
return name;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -745,15 +745,22 @@ long flush(
quotaError = statusCode.equals(Status.Code.RESOURCE_EXHAUSTED);
}

int allowedRetry;

if (!quotaError) {
// This forces us to close and reopen all gRPC connections to Storage API on error,
// which empirically fixes random stuckness issues.
invalidateWriteStream();
allowedRetry = 5;
} else {
allowedRetry = 10;
}

// Maximum number of times we retry before we fail the work item.
if (failedContext.failureCount > 5) {
throw new RuntimeException("More than 5 attempts to call AppendRows failed.");
if (failedContext.failureCount > allowedRetry) {
throw new RuntimeException(
String.format(
"More than %d attempts to call AppendRows failed.", allowedRetry));
}

// The following errors are known to be persistent, so always fail the work item in
Expand Down Expand Up @@ -944,11 +951,12 @@ void flushAll(
long numRowsWritten = 0;
for (DestinationState destinationState :
Preconditions.checkStateNotNull(destinations).values()) {

RetryManager<AppendRowsResponse, AppendRowsContext> retryManager =
new RetryManager<>(
Duration.standardSeconds(1),
Duration.standardSeconds(10),
1000,
Duration.standardSeconds(20),
500,
BigQuerySinkMetrics.throttledTimeCounter(
BigQuerySinkMetrics.RpcMethod.APPEND_ROWS));
retryManagers.add(retryManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -818,8 +818,8 @@ public void process(
RetryManager<AppendRowsResponse, AppendRowsContext> retryManager =
new RetryManager<>(
Duration.standardSeconds(1),
Duration.standardSeconds(10),
1000,
Duration.standardSeconds(20),
500,
BigQuerySinkMetrics.throttledTimeCounter(BigQuerySinkMetrics.RpcMethod.APPEND_ROWS));
int numAppends = 0;
for (SplittingIterable.Value splitValue : messages) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.Assert.assertEquals;

import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.Exceptions;
Expand Down Expand Up @@ -169,11 +170,16 @@ public void testThrottledTimeCounter() throws Exception {
appendRowsThrottleCounter.inc(1);
assertThat(
appendRowsThrottleCounter.getName().getName(),
equalTo("ThrottledTime*rpc_method:APPEND_ROWS;"));
equalTo("ThrottledTime*rpc_method:APPEND_ROWS;throttling-msecs"));

// check that both sub-counters have been incremented
MetricName counterName =
MetricName.named("BigQuerySink", "ThrottledTime*rpc_method:APPEND_ROWS;");
testContainer.assertPerWorkerCounterValue(counterName, 1L);

counterName =
MetricName.named(BigQueryServicesImpl.StorageClientImpl.class, "throttling-msecs");
assertEquals(1L, (long) testContainer.getCounter(counterName).getCumulative());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.DatasetServiceImpl;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.NestedCounter;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
Expand Down Expand Up @@ -212,4 +215,61 @@ public void testInsertAll() throws Exception {
// Each of the 25 rows has 1 byte for length and 30 bytes: '{"f":[{"v":"foo"},{"v":1234}]}'
assertEquals("Incorrect byte count", 25L * 31L, totalBytes);
}

static class ReadableCounter implements Counter {

private MetricName name;
private long value;

public ReadableCounter(MetricName name) {
this.name = name;
this.value = 0;
}

public long getValue() {
return value;
}

@Override
public void inc() {
++value;
}

@Override
public void inc(long n) {
value += n;
}

@Override
public void dec() {
--value;
}

@Override
public void dec(long n) {
value -= n;
}

@Override
public MetricName getName() {
return name;
}
}

@Test
public void testNestedCounter() {
MetricName name1 = MetricName.named(this.getClass(), "metric1");
MetricName name2 = MetricName.named(this.getClass(), "metric2");
ReadableCounter counter1 = new ReadableCounter(name1);
ReadableCounter counter2 = new ReadableCounter(name2);
NestedCounter nested =
new NestedCounter(MetricName.named(this.getClass(), "nested"), counter1, counter2);
counter1.inc();
nested.inc();
nested.inc(10);
nested.dec();
nested.dec(2);
assertEquals(9, counter1.getValue());
assertEquals(8, counter2.getValue());
}
}

0 comments on commit 675dab2

Please sign in to comment.