Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-14035] Convert BigQuery SchemaIO to SchemaTransform #17607

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@
@Experimental(Kind.SCHEMAS)
public abstract class TypedSchemaTransformProvider<ConfigT> implements SchemaTransformProvider {

abstract Class<ConfigT> configurationClass();
protected abstract Class<ConfigT> configurationClass();

/**
* Produce a SchemaTransform from ConfigT. Can throw a {@link InvalidConfigurationException} or a
* {@link InvalidSchemaException}.
*/
abstract SchemaTransform from(ConfigT configuration);
protected abstract SchemaTransform from(ConfigT configuration);

/**
* List the dependencies needed for this transform. Jars from classpath are used by default when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public String identifier() {
}

@Override
Class<Configuration> configurationClass() {
protected Class<Configuration> configurationClass() {
return Configuration.class;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.auto.value.AutoValue;
import javax.annotation.Nullable;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;

/**
* Configuration for reading from BigQuery.
*
* <p>This class is meant to be used with {@link BigQuerySchemaTransformReadProvider}.
*
* <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
* provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
* repository.
*/
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract class BigQuerySchemaTransformReadConfiguration {

/** Instantiates a {@link BigQuerySchemaTransformReadConfiguration.Builder}. */
public static Builder builder() {
return new AutoValue_BigQuerySchemaTransformReadConfiguration.Builder();
}

private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema();
private static final TypeDescriptor<BigQuerySchemaTransformReadConfiguration> TYPE_DESCRIPTOR =
TypeDescriptor.of(BigQuerySchemaTransformReadConfiguration.class);
private static final SerializableFunction<BigQuerySchemaTransformReadConfiguration, Row>
ROW_SERIALIZABLE_FUNCTION = AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);

/** Serializes configuration to a {@link Row}. */
Row toBeamRow() {
return ROW_SERIALIZABLE_FUNCTION.apply(this);
}

/** Configures the BigQuery read job with the SQL query. */
@Nullable
public abstract String getQuery();

/**
* Specifies a table for a BigQuery read job. See {@link BigQueryIO.TypedRead#from(String)} for
* more details on the expected format.
*/
@Nullable
public abstract String getTableSpec();

/** BigQuery geographic location where the query job will be executed. */
@Nullable
public abstract String getQueryLocation();

/** Enables BigQuery's Standard SQL dialect when reading from a query. */
@Nullable
public abstract Boolean getUseStandardSql();

@AutoValue.Builder
public abstract static class Builder {

/** Configures the BigQuery read job with the SQL query. */
public abstract Builder setQuery(String value);

/**
* Specifies a table for a BigQuery read job. See {@link BigQueryIO.TypedRead#from(String)} for
* more details on the expected format.
*/
public abstract Builder setTableSpec(String value);

/** BigQuery geographic location where the query job will be executed. */
public abstract Builder setQueryLocation(String value);

/** Enables BigQuery's Standard SQL dialect when reading from a query. */
public abstract Builder setUseStandardSql(Boolean value);

/** Builds the {@link BigQuerySchemaTransformReadConfiguration} configuration. */
public abstract BigQuerySchemaTransformReadConfiguration build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.services.bigquery.model.TableRow;
import com.google.common.annotations.VisibleForTesting;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;

/**
* An implementation of {@link TypedSchemaTransformProvider} for BigQuery read jobs configured using
* {@link BigQuerySchemaTransformReadConfiguration}.
*
* <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
* provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
* repository.
*/
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
@Internal
@Experimental(Kind.SCHEMAS)
public class BigQuerySchemaTransformReadProvider
extends TypedSchemaTransformProvider<BigQuerySchemaTransformReadConfiguration> {

private static final String API = "bigquery";
private static final String OUTPUT_TAG = "OUTPUT";

/** Returns the expected class of the configuration. */
@Override
protected Class<BigQuerySchemaTransformReadConfiguration> configurationClass() {
return BigQuerySchemaTransformReadConfiguration.class;
}

/** Returns the expected {@link SchemaTransform} of the configuration. */
@Override
protected SchemaTransform from(BigQuerySchemaTransformReadConfiguration configuration) {
return new BigQueryReadSchemaTransform(configuration);
}

/** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
@Override
public String identifier() {
return String.format("%s:read", API);
}

/**
* Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since
* no input is expected, this returns an empty list.
*/
@Override
public List<String> inputCollectionNames() {
return Collections.emptyList();
}

/**
* Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
* a single output is expected, this returns a list with a single name.
*/
@Override
public List<String> outputCollectionNames() {
return Collections.singletonList(OUTPUT_TAG);
}

/**
* An implementation of {@link SchemaTransform} for BigQuery read jobs configured using {@link
* BigQuerySchemaTransformReadConfiguration}.
*/
private static class BigQueryReadSchemaTransform implements SchemaTransform {
private final BigQuerySchemaTransformReadConfiguration configuration;

BigQueryReadSchemaTransform(BigQuerySchemaTransformReadConfiguration configuration) {
this.configuration = configuration;
}

/** Implements {@link SchemaTransform} buildTransform method. */
@Override
public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
return new PCollectionRowTupleTransform(configuration);
}
}

/**
* An implementation of {@link PTransform} for BigQuery read jobs configured using {@link
* BigQuerySchemaTransformReadConfiguration}.
*/
static class PCollectionRowTupleTransform
extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {

private final BigQuerySchemaTransformReadConfiguration configuration;

/** An instance of {@link BigQueryServices} used for testing. */
private BigQueryServices testBigQueryServices = null;

PCollectionRowTupleTransform(BigQuerySchemaTransformReadConfiguration configuration) {
this.configuration = configuration;
}

@VisibleForTesting
void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
this.testBigQueryServices = testBigQueryServices;
}

@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
if (!input.getAll().isEmpty()) {
throw new IllegalArgumentException(
String.format(
"%s %s input is expected to be empty",
input.getClass().getSimpleName(), getClass().getSimpleName()));
}

BigQueryIO.TypedRead<TableRow> read = toTypedRead();
if (testBigQueryServices != null) {
read = read.withTestServices(testBigQueryServices).withoutValidation();
}

PCollection<TableRow> tableRowPCollection = input.getPipeline().apply(read);
Schema schema = tableRowPCollection.getSchema();
PCollection<Row> rowPCollection =
tableRowPCollection.apply(
MapElements.into(TypeDescriptor.of(Row.class))
.via((tableRow) -> BigQueryUtils.toBeamRow(schema, tableRow)));
return PCollectionRowTuple.of(OUTPUT_TAG, rowPCollection.setRowSchema(schema));
}

BigQueryIO.TypedRead<TableRow> toTypedRead() {
BigQueryIO.TypedRead<TableRow> read = BigQueryIO.readTableRowsWithSchema();

if (!Strings.isNullOrEmpty(configuration.getQuery())) {
read = read.fromQuery(configuration.getQuery());
}

if (!Strings.isNullOrEmpty(configuration.getTableSpec())) {
read = read.from(configuration.getTableSpec());
}

if (configuration.getUseStandardSql() != null && configuration.getUseStandardSql()) {
read = read.usingStandardSql();
}

if (!Strings.isNullOrEmpty(configuration.getQueryLocation())) {
read = read.withQueryLocation(configuration.getQueryLocation());
}

return read;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;

/**
* Configuration for writing to BigQuery.
*
* <p>This class is meant to be used with {@link BigQuerySchemaTransformWriteProvider}.
*
* <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
* provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
* repository.
*/
@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract class BigQuerySchemaTransformWriteConfiguration {

/** Instantiates a {@link BigQuerySchemaTransformWriteConfiguration.Builder}. */
public static Builder builder() {
return new AutoValue_BigQuerySchemaTransformWriteConfiguration.Builder();
}

private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema();
private static final TypeDescriptor<BigQuerySchemaTransformWriteConfiguration> TYPE_DESCRIPTOR =
TypeDescriptor.of(BigQuerySchemaTransformWriteConfiguration.class);
private static final SerializableFunction<BigQuerySchemaTransformWriteConfiguration, Row>
ROW_SERIALIZABLE_FUNCTION = AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);

/**
* Writes to the given table specification. See {@link BigQueryIO.Write#to(String)}} for the
* expected format.
*/
public abstract String getTableSpec();

/** Specifies whether the table should be created if it does not exist. */
public abstract String getCreateDisposition();

/** Specifies what to do with existing data in the table, in case the table already exists. */
public abstract String getWriteDisposition();

/** Serializes configuration to a {@link Row}. */
Row toBeamRow() {
return ROW_SERIALIZABLE_FUNCTION.apply(this);
}

@AutoValue.Builder
public abstract static class Builder {

/**
* Writes to the given table specification. See {@link BigQueryIO.Write#to(String)}} for the
* expected format.
*/
public abstract Builder setTableSpec(String value);

/** Specifies whether the table should be created if it does not exist. */
public abstract Builder setCreateDisposition(String value);

/** Specifies what to do with existing data in the table, in case the table already exists. */
public abstract Builder setWriteDisposition(String value);

/** Builds the {@link BigQuerySchemaTransformWriteConfiguration} configuration. */
public abstract BigQuerySchemaTransformWriteConfiguration build();
}
}
Loading