Skip to content

Commit

Permalink
[BEAM-14035 ] Implement BigQuerySchema Read/Write TransformProvider (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
damondouglas authored May 16, 2022
1 parent 341a836 commit ee5888d
Show file tree
Hide file tree
Showing 8 changed files with 1,153 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@
@Experimental(Kind.SCHEMAS)
public abstract class TypedSchemaTransformProvider<ConfigT> implements SchemaTransformProvider {

abstract Class<ConfigT> configurationClass();
protected abstract Class<ConfigT> configurationClass();

/**
* Produce a SchemaTransform from ConfigT. Can throw a {@link InvalidConfigurationException} or a
* {@link InvalidSchemaException}.
*/
abstract SchemaTransform from(ConfigT configuration);
protected abstract SchemaTransform from(ConfigT configuration);

/**
* List the dependencies needed for this transform. Jars from classpath are used by default when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public String identifier() {
}

@Override
Class<Configuration> configurationClass() {
protected Class<Configuration> configurationClass() {
return Configuration.class;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.auto.value.AutoValue;
import javax.annotation.Nullable;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;

/**
* Configuration for reading from BigQuery.
*
* <p>This class is meant to be used with {@link BigQuerySchemaTransformReadProvider}.
*
* <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
* provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
* repository.
*/
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract class BigQuerySchemaTransformReadConfiguration {

/** Instantiates a {@link BigQuerySchemaTransformReadConfiguration.Builder}. */
public static Builder builder() {
return new AutoValue_BigQuerySchemaTransformReadConfiguration.Builder();
}

private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema();
private static final TypeDescriptor<BigQuerySchemaTransformReadConfiguration> TYPE_DESCRIPTOR =
TypeDescriptor.of(BigQuerySchemaTransformReadConfiguration.class);
private static final SerializableFunction<BigQuerySchemaTransformReadConfiguration, Row>
ROW_SERIALIZABLE_FUNCTION = AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);

/** Serializes configuration to a {@link Row}. */
Row toBeamRow() {
return ROW_SERIALIZABLE_FUNCTION.apply(this);
}

/** Configures the BigQuery read job with the SQL query. */
@Nullable
public abstract String getQuery();

/**
* Specifies a table for a BigQuery read job. See {@link BigQueryIO.TypedRead#from(String)} for
* more details on the expected format.
*/
@Nullable
public abstract String getTableSpec();

/** BigQuery geographic location where the query job will be executed. */
@Nullable
public abstract String getQueryLocation();

/** Enables BigQuery's Standard SQL dialect when reading from a query. */
@Nullable
public abstract Boolean getUseStandardSql();

@AutoValue.Builder
public abstract static class Builder {

/** Configures the BigQuery read job with the SQL query. */
public abstract Builder setQuery(String value);

/**
* Specifies a table for a BigQuery read job. See {@link BigQueryIO.TypedRead#from(String)} for
* more details on the expected format.
*/
public abstract Builder setTableSpec(String value);

/** BigQuery geographic location where the query job will be executed. */
public abstract Builder setQueryLocation(String value);

/** Enables BigQuery's Standard SQL dialect when reading from a query. */
public abstract Builder setUseStandardSql(Boolean value);

/** Builds the {@link BigQuerySchemaTransformReadConfiguration} configuration. */
public abstract BigQuerySchemaTransformReadConfiguration build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.services.bigquery.model.TableRow;
import com.google.common.annotations.VisibleForTesting;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;

/**
* An implementation of {@link TypedSchemaTransformProvider} for BigQuery read jobs configured using
* {@link BigQuerySchemaTransformReadConfiguration}.
*
* <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
* provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
* repository.
*/
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
@Internal
@Experimental(Kind.SCHEMAS)
public class BigQuerySchemaTransformReadProvider
extends TypedSchemaTransformProvider<BigQuerySchemaTransformReadConfiguration> {

private static final String API = "bigquery";
private static final String OUTPUT_TAG = "OUTPUT";

/** Returns the expected class of the configuration. */
@Override
protected Class<BigQuerySchemaTransformReadConfiguration> configurationClass() {
return BigQuerySchemaTransformReadConfiguration.class;
}

/** Returns the expected {@link SchemaTransform} of the configuration. */
@Override
protected SchemaTransform from(BigQuerySchemaTransformReadConfiguration configuration) {
return new BigQueryReadSchemaTransform(configuration);
}

/** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
@Override
public String identifier() {
return String.format("%s:read", API);
}

/**
* Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since
* no input is expected, this returns an empty list.
*/
@Override
public List<String> inputCollectionNames() {
return Collections.emptyList();
}

/**
* Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
* a single output is expected, this returns a list with a single name.
*/
@Override
public List<String> outputCollectionNames() {
return Collections.singletonList(OUTPUT_TAG);
}

/**
* An implementation of {@link SchemaTransform} for BigQuery read jobs configured using {@link
* BigQuerySchemaTransformReadConfiguration}.
*/
private static class BigQueryReadSchemaTransform implements SchemaTransform {
private final BigQuerySchemaTransformReadConfiguration configuration;

BigQueryReadSchemaTransform(BigQuerySchemaTransformReadConfiguration configuration) {
this.configuration = configuration;
}

/** Implements {@link SchemaTransform} buildTransform method. */
@Override
public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
return new PCollectionRowTupleTransform(configuration);
}
}

/**
* An implementation of {@link PTransform} for BigQuery read jobs configured using {@link
* BigQuerySchemaTransformReadConfiguration}.
*/
static class PCollectionRowTupleTransform
extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {

private final BigQuerySchemaTransformReadConfiguration configuration;

/** An instance of {@link BigQueryServices} used for testing. */
private BigQueryServices testBigQueryServices = null;

PCollectionRowTupleTransform(BigQuerySchemaTransformReadConfiguration configuration) {
this.configuration = configuration;
}

@VisibleForTesting
void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
this.testBigQueryServices = testBigQueryServices;
}

@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
if (!input.getAll().isEmpty()) {
throw new IllegalArgumentException(
String.format(
"%s %s input is expected to be empty",
input.getClass().getSimpleName(), getClass().getSimpleName()));
}

BigQueryIO.TypedRead<TableRow> read = toTypedRead();
if (testBigQueryServices != null) {
read = read.withTestServices(testBigQueryServices).withoutValidation();
}

PCollection<TableRow> tableRowPCollection = input.getPipeline().apply(read);
Schema schema = tableRowPCollection.getSchema();
PCollection<Row> rowPCollection =
tableRowPCollection.apply(
MapElements.into(TypeDescriptor.of(Row.class))
.via((tableRow) -> BigQueryUtils.toBeamRow(schema, tableRow)));
return PCollectionRowTuple.of(OUTPUT_TAG, rowPCollection.setRowSchema(schema));
}

BigQueryIO.TypedRead<TableRow> toTypedRead() {
BigQueryIO.TypedRead<TableRow> read = BigQueryIO.readTableRowsWithSchema();

if (!Strings.isNullOrEmpty(configuration.getQuery())) {
read = read.fromQuery(configuration.getQuery());
}

if (!Strings.isNullOrEmpty(configuration.getTableSpec())) {
read = read.from(configuration.getTableSpec());
}

if (configuration.getUseStandardSql() != null && configuration.getUseStandardSql()) {
read = read.usingStandardSql();
}

if (!Strings.isNullOrEmpty(configuration.getQueryLocation())) {
read = read.withQueryLocation(configuration.getQueryLocation());
}

return read;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;

/**
* Configuration for writing to BigQuery.
*
* <p>This class is meant to be used with {@link BigQuerySchemaTransformWriteProvider}.
*
* <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
* provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
* repository.
*/
@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract class BigQuerySchemaTransformWriteConfiguration {

/** Instantiates a {@link BigQuerySchemaTransformWriteConfiguration.Builder}. */
public static Builder builder() {
return new AutoValue_BigQuerySchemaTransformWriteConfiguration.Builder();
}

private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema();
private static final TypeDescriptor<BigQuerySchemaTransformWriteConfiguration> TYPE_DESCRIPTOR =
TypeDescriptor.of(BigQuerySchemaTransformWriteConfiguration.class);
private static final SerializableFunction<BigQuerySchemaTransformWriteConfiguration, Row>
ROW_SERIALIZABLE_FUNCTION = AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);

/**
* Writes to the given table specification. See {@link BigQueryIO.Write#to(String)}} for the
* expected format.
*/
public abstract String getTableSpec();

/** Specifies whether the table should be created if it does not exist. */
public abstract String getCreateDisposition();

/** Specifies what to do with existing data in the table, in case the table already exists. */
public abstract String getWriteDisposition();

/** Serializes configuration to a {@link Row}. */
Row toBeamRow() {
return ROW_SERIALIZABLE_FUNCTION.apply(this);
}

@AutoValue.Builder
public abstract static class Builder {

/**
* Writes to the given table specification. See {@link BigQueryIO.Write#to(String)}} for the
* expected format.
*/
public abstract Builder setTableSpec(String value);

/** Specifies whether the table should be created if it does not exist. */
public abstract Builder setCreateDisposition(String value);

/** Specifies what to do with existing data in the table, in case the table already exists. */
public abstract Builder setWriteDisposition(String value);

/** Builds the {@link BigQuerySchemaTransformWriteConfiguration} configuration. */
public abstract BigQuerySchemaTransformWriteConfiguration build();
}
}
Loading

0 comments on commit ee5888d

Please sign in to comment.