Skip to content

Commit

Permalink
Revert "Remove TableSchema to JSON conversion. (#28274)" (#28533)
Browse files Browse the repository at this point in the history
This reverts commit 7e83059.
  • Loading branch information
Abacn authored Sep 19, 2023
1 parent 6c4bdf1 commit a90879a
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

Expand Down Expand Up @@ -50,6 +49,7 @@
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -132,10 +132,13 @@
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Function;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
Expand Down Expand Up @@ -646,19 +649,29 @@ public static TypedRead<TableRow> readTableRowsWithSchema() {
BigQueryUtils.tableRowFromBeamRow());
}

private static class TableSchemaFunction
implements Serializable, Function<@Nullable String, @Nullable TableSchema> {
@Override
public @Nullable TableSchema apply(@Nullable String input) {
return BigQueryHelpers.fromJsonString(input, TableSchema.class);
}
}

@VisibleForTesting
static class GenericDatumTransformer<T> implements DatumReader<T> {
private final SerializableFunction<SchemaAndRecord, T> parseFn;
private final TableSchema tableSchema;
private final Supplier<TableSchema> tableSchema;
private GenericDatumReader<T> reader;
private org.apache.avro.Schema writerSchema;

public GenericDatumTransformer(
SerializableFunction<SchemaAndRecord, T> parseFn,
TableSchema tableSchema,
String tableSchema,
org.apache.avro.Schema writer) {
this.parseFn = parseFn;
this.tableSchema = tableSchema;
this.tableSchema =
Suppliers.memoize(
Suppliers.compose(new TableSchemaFunction(), Suppliers.ofInstance(tableSchema)));
this.writerSchema = writer;
this.reader = new GenericDatumReader<>(this.writerSchema);
}
Expand All @@ -676,7 +689,7 @@ public void setSchema(org.apache.avro.Schema schema) {
@Override
public T read(T reuse, Decoder in) throws IOException {
GenericRecord record = (GenericRecord) this.reader.read(reuse, in);
return parseFn.apply(new SchemaAndRecord(record, this.tableSchema));
return parseFn.apply(new SchemaAndRecord(record, this.tableSchema.get()));
}
}

Expand Down Expand Up @@ -708,9 +721,16 @@ public static <T> TypedRead<T> read(SerializableFunction<SchemaAndRecord, T> par
.setDatumReaderFactory(
(SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>>)
input -> {
TableSchema safeInput = checkStateNotNull(input);
return (AvroSource.DatumReaderFactory<T>)
(writer, reader) -> new GenericDatumTransformer<>(parseFn, safeInput, writer);
try {
String jsonTableSchema = BigQueryIO.JSON_FACTORY.toString(input);
return (AvroSource.DatumReaderFactory<T>)
(writer, reader) ->
new GenericDatumTransformer<>(parseFn, jsonTableSchema, writer);
} catch (IOException e) {
LOG.warn(
String.format("Error while converting table schema %s to JSON!", input), e);
return null;
}
})
// TODO: Remove setParseFn once https://github.com/apache/beam/issues/21076 is fixed.
.setParseFn(parseFn)
Expand Down Expand Up @@ -3366,7 +3386,9 @@ private <DestinationT> WriteResult expandTyped(
@SuppressWarnings({"unchecked", "nullness"})
Descriptors.Descriptor descriptor =
(Descriptors.Descriptor)
checkStateNotNull(writeProtoClass.getMethod("getDescriptor")).invoke(null);
org.apache.beam.sdk.util.Preconditions.checkStateNotNull(
writeProtoClass.getMethod("getDescriptor"))
.invoke(null);
TableSchema tableSchema =
TableRowToStorageApiProto.protoSchemaToTableSchema(
TableRowToStorageApiProto.tableSchemaFromDescriptor(descriptor));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -144,11 +143,18 @@ public void evaluate() throws Throwable {

private SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<TableRow>>
datumReaderFactoryFn =
input ->
(AvroSource.DatumReaderFactory<TableRow>)
(writer, reader) ->
new BigQueryIO.GenericDatumTransformer<>(
BigQueryIO.TableRowParser.INSTANCE, checkStateNotNull(input), writer);
(SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<TableRow>>)
input -> {
try {
String jsonSchema = BigQueryIO.JSON_FACTORY.toString(input);
return (AvroSource.DatumReaderFactory<TableRow>)
(writer, reader) ->
new BigQueryIO.GenericDatumTransformer<>(
BigQueryIO.TableRowParser.INSTANCE, jsonSchema, writer);
} catch (IOException e) {
return null;
}
};

private static class MyData implements Serializable {
private String name;
Expand Down

0 comments on commit a90879a

Please sign in to comment.