Skip to content

Commit

Permalink
Allow setting BigQuery endpoint (apache#32153)
Browse files Browse the repository at this point in the history
* Allow setting BigQuery endpoint

* Changes.md update
  • Loading branch information
kberezin-nshl authored and reeba212 committed Dec 4, 2024
1 parent 8a6c7fd commit ce13ec2
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* BigQuery endpoint can be overridden via PipelineOptions, this enables BigQuery emulators (Java) ([#28149](https://github.com/apache/beam/issues/28149)).
* Go SDK Minimum Go Version updated to 1.21 ([#32092](https://github.com/apache/beam/pull/32092)).
* [BigQueryIO] Added support for withFormatRecordOnFailureFunction() for STORAGE_WRITE_API and STORAGE_API_AT_LEAST_ONCE methods (Java) ([#31354](https://github.com/apache/beam/issues/31354)).
* Updated Go protobuf package to new version (Go) ([#21515](https://github.com/apache/beam/issues/21515)).
Expand Down
1 change: 1 addition & 0 deletions sdks/java/io/google-cloud-platform/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ dependencies {
implementation library.java.http_client
implementation library.java.hamcrest
implementation library.java.http_core
implementation library.java.jackson_annotations
implementation library.java.jackson_core
implementation library.java.jackson_databind
implementation library.java.jackson_datatype_joda
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;

import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Hidden;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;

Expand Down Expand Up @@ -213,4 +215,12 @@ public interface BigQueryOptions
Boolean getEnableStorageReadApiV2();

void setEnableStorageReadApiV2(Boolean value);

/** BQ endpoint to use. If unspecified, uses the default endpoint. */
@JsonIgnore
@Hidden
@Description("The URL for the BigQuery API.")
String getBigQueryEndpoint();

void setBigQueryEndpoint(String value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
Expand All @@ -158,6 +159,7 @@
"keyfor"
})
public class BigQueryServicesImpl implements BigQueryServices {

private static final Logger LOG = LoggerFactory.getLogger(BigQueryServicesImpl.class);

// The maximum number of retries to execute a BigQuery RPC.
Expand Down Expand Up @@ -215,6 +217,7 @@ private static BackOff createDefaultBackoff() {

@VisibleForTesting
static class JobServiceImpl implements BigQueryServices.JobService {

private final ApiErrorExtractor errorExtractor;
private final Bigquery client;
private final BigQueryIOMetadata bqIOMetadata;
Expand All @@ -226,12 +229,18 @@ static class JobServiceImpl implements BigQueryServices.JobService {
this.bqIOMetadata = BigQueryIOMetadata.create();
}

private JobServiceImpl(BigQueryOptions options) {
@VisibleForTesting
JobServiceImpl(BigQueryOptions options) {
this.errorExtractor = new ApiErrorExtractor();
this.client = newBigQueryClient(options).build();
this.bqIOMetadata = BigQueryIOMetadata.create();
}

@VisibleForTesting
Bigquery getClient() {
return client;
}

/**
* {@inheritDoc}
*
Expand Down Expand Up @@ -558,6 +567,7 @@ public void close() throws Exception {}

@VisibleForTesting
public static class DatasetServiceImpl implements DatasetService {

// Backoff: 200ms * 1.5 ^ n, n=[1,5]
private static final FluentBackoff INSERT_BACKOFF_FACTORY =
FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(200)).withMaxRetries(5);
Expand Down Expand Up @@ -610,6 +620,11 @@ public DatasetServiceImpl(BigQueryOptions bqOptions) {
this.executor = null;
}

@VisibleForTesting
Bigquery getClient() {
return client;
}

/**
* {@inheritDoc}
*
Expand Down Expand Up @@ -931,6 +946,7 @@ public void deleteDataset(String projectId, String datasetId)
}

static class InsertBatchofRowsCallable implements Callable<List<InsertErrors>> {

private final TableReference ref;
private final Boolean skipInvalidRows;
private final Boolean ignoreUnkownValues;
Expand Down Expand Up @@ -1359,6 +1375,7 @@ public void close() throws Exception {

@VisibleForTesting
public static class WriteStreamServiceImpl implements WriteStreamService {

private final BigQueryWriteClient newWriteClient;
private final long storageWriteMaxInflightRequests;
private final long storageWriteMaxInflightBytes;
Expand All @@ -1383,6 +1400,11 @@ public WriteStreamServiceImpl(BigQueryOptions bqOptions) {
this.bqIOMetadata = BigQueryIOMetadata.create();
}

@VisibleForTesting
BigQueryWriteClient getClient() {
return newWriteClient;
}

@Override
public WriteStream createWriteStream(String tableUrn, WriteStream.Type type)
throws IOException {
Expand Down Expand Up @@ -1599,10 +1621,16 @@ private static Bigquery.Builder newBigQueryClient(BigQueryOptions options) {
HttpRequestInitializer chainInitializer =
new ChainingHttpRequestInitializer(
Iterables.toArray(initBuilder.build(), HttpRequestInitializer.class));
return new Bigquery.Builder(
Transport.getTransport(), Transport.getJsonFactory(), chainInitializer)
.setApplicationName(options.getAppName())
.setGoogleClientRequestInitializer(options.getGoogleApiTrace());
Bigquery.Builder builder =
new Bigquery.Builder(Transport.getTransport(), Transport.getJsonFactory(), chainInitializer)
.setApplicationName(options.getAppName())
.setGoogleClientRequestInitializer(options.getGoogleApiTrace());

@Nullable String endpoint = options.getBigQueryEndpoint();
if (!Strings.isNullOrEmpty(endpoint)) {
builder.setRootUrl(endpoint);
}
return builder;
}

private static BigQueryWriteClient newBigQueryWriteClient(BigQueryOptions options) {
Expand All @@ -1615,8 +1643,13 @@ private static BigQueryWriteClient newBigQueryWriteClient(BigQueryOptions option
.setChannelsPerCpu(2)
.build();

BigQueryWriteSettings.Builder builder = BigQueryWriteSettings.newBuilder();
@Nullable String endpoint = options.getBigQueryEndpoint();
if (!Strings.isNullOrEmpty(endpoint)) {
builder.setEndpoint(trimSchemaIfNecessary(endpoint));
}
return BigQueryWriteClient.create(
BigQueryWriteSettings.newBuilder()
builder
.setCredentialsProvider(() -> options.as(GcpOptions.class).getGcpCredential())
.setTransportChannelProvider(transportChannelProvider)
.setBackgroundExecutorProvider(
Expand All @@ -1628,6 +1661,15 @@ private static BigQueryWriteClient newBigQueryWriteClient(BigQueryOptions option
}
}

private static String trimSchemaIfNecessary(String endpoint) {
if (endpoint.startsWith("http://")) {
return endpoint.substring("http://".length());
} else if (endpoint.startsWith("https://")) {
return endpoint.substring("https://".length());
}
return endpoint;
}

public static CustomHttpErrors createBigQueryClientCustomErrors() {
CustomHttpErrors.Builder builder = new CustomHttpErrors.Builder();
// 403 errors, to list tables, matching this URL:
Expand Down Expand Up @@ -1725,6 +1767,10 @@ public void onRetryAttempt(Status status, Metadata metadata) {
.setHeaderProvider(USER_AGENT_HEADER_PROVIDER)
.build())
.setReadRowsRetryAttemptListener(listener);
@Nullable String endpoint = options.getBigQueryEndpoint();
if (!Strings.isNullOrEmpty(endpoint)) {
settingsBuilder.setEndpoint(trimSchemaIfNecessary(endpoint));
}

UnaryCallSettings.Builder<CreateReadSessionRequest, ReadSession> createReadSessionSettings =
settingsBuilder.getStubSettingsBuilder().createReadSessionSettings();
Expand Down Expand Up @@ -1754,6 +1800,11 @@ public void onRetryAttempt(Status status, Metadata metadata) {
this.client = BigQueryReadClient.create(settingsBuilder.build());
}

@VisibleForTesting
BigQueryReadClient getClient() {
return client;
}

@VisibleForTesting
RetryAttemptCounter getListener() {
return listener;
Expand Down Expand Up @@ -1840,6 +1891,7 @@ public void close() {
}

private static class BoundedExecutorService {

private final ListeningExecutorService taskExecutor;
private final ListeningExecutorService taskSubmitExecutor;
private final Semaphore semaphore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
/** Tests for {@link BigQueryServicesImpl}. */
@RunWith(JUnit4.class)
public class BigQueryServicesImplTest {

@Rule public ExpectedException thrown = ExpectedException.none();
@Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(BigQueryServicesImpl.class);
// A test can make mock responses through setupMockResponses
Expand Down Expand Up @@ -171,6 +172,7 @@ public LowLevelHttpResponse execute() throws IOException {

@FunctionalInterface
private interface MockSetupFunction {

void apply(LowLevelHttpResponse t) throws IOException;
}

Expand Down Expand Up @@ -2092,4 +2094,29 @@ public RetryInfo parseBytes(byte[] serialized) {
impl.reportPendingMetrics();
assertEquals(123456, (long) container.getCounter(metricName).getCumulative());
}

@Test
public void testEndpointOverrides() throws IOException {
BigQueryOptions options = PipelineOptionsFactory.create().as(BigQueryOptions.class);
options.setBigQueryEndpoint("http://example.com:80");

assertEquals(
"http://example.com:80/bigquery/v2/",
new BigQueryServicesImpl.JobServiceImpl(options).getClient().getBaseUrl());
assertEquals(
"http://example.com:80/bigquery/v2/",
new BigQueryServicesImpl.DatasetServiceImpl(options).getClient().getBaseUrl());
assertEquals(
"example.com:80",
new BigQueryServicesImpl.WriteStreamServiceImpl(options)
.getClient()
.getSettings()
.getEndpoint());
assertEquals(
"example.com:80",
new BigQueryServicesImpl.StorageClientImpl(options)
.getClient()
.getSettings()
.getEndpoint());
}
}

0 comments on commit ce13ec2

Please sign in to comment.