Skip to content

Commit

Permalink
Merge pull request #17115 from ibzib/bq-display-data
Browse files Browse the repository at this point in the history
[BEAM-14124] Add display data to BQ storage reads.
  • Loading branch information
ibzib authored Mar 21, 2022
2 parents 83131fb + ce71caf commit 57c8647
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,7 @@ public static <T> TypedRead<T> read(SerializableFunction<SchemaAndRecord, T> par
.setMethod(TypedRead.Method.DEFAULT)
.setUseAvroLogicalTypes(false)
.setFormat(DataFormat.AVRO)
.setProjectionPushdownApplied(false)
.build();
}

Expand Down Expand Up @@ -805,6 +806,8 @@ abstract static class Builder<T> {
abstract Builder<T> setFromBeamRowFn(FromBeamRowFunction<T> fromRowFn);

abstract Builder<T> setUseAvroLogicalTypes(Boolean useAvroLogicalTypes);

abstract Builder<T> setProjectionPushdownApplied(boolean projectionPushdownApplied);
}

abstract @Nullable ValueProvider<String> getJsonTableRef();
Expand Down Expand Up @@ -853,6 +856,8 @@ abstract static class Builder<T> {

abstract Boolean getUseAvroLogicalTypes();

abstract boolean getProjectionPushdownApplied();

/**
* An enumeration type for the priority of a query.
*
Expand Down Expand Up @@ -1229,7 +1234,8 @@ private PCollection<T> expandForDirectRead(PBegin input, Coder<T> outputCoder) {
getRowRestriction(),
getParseFn(),
outputCoder,
getBigQueryServices())));
getBigQueryServices(),
getProjectionPushdownApplied())));
}

checkArgument(
Expand Down Expand Up @@ -1430,6 +1436,10 @@ public void populateDisplayData(DisplayData.Builder builder) {
DisplayData.item("table", BigQueryHelpers.displayTable(getTableProvider()))
.withLabel("Table"))
.addIfNotNull(DisplayData.item("query", getQuery()).withLabel("Query"))
.addIfNotDefault(
DisplayData.item("projectionPushdownApplied", getProjectionPushdownApplied())
.withLabel("Projection Pushdown Applied"),
false)
.addIfNotNull(
DisplayData.item("flattenResults", getFlattenResults())
.withLabel("Flatten Query Results"))
Expand All @@ -1438,6 +1448,13 @@ public void populateDisplayData(DisplayData.Builder builder) {
.withLabel("Use Legacy SQL Dialect"))
.addIfNotDefault(
DisplayData.item("validation", getValidate()).withLabel("Validation Enabled"), true);

ValueProvider<List<String>> selectedFieldsProvider = getSelectedFields();
if (selectedFieldsProvider != null && selectedFieldsProvider.isAccessible()) {
builder.add(
DisplayData.item("selectedFields", String.join(", ", selectedFieldsProvider.get()))
.withLabel("Selected Fields"));
}
}

/** Ensures that methods of the from() / fromQuery() family are called at most once. */
Expand Down Expand Up @@ -1623,6 +1640,11 @@ public TypedRead<T> useAvroLogicalTypes() {
return toBuilder().setUseAvroLogicalTypes(true).build();
}

@VisibleForTesting
TypedRead<T> withProjectionPushdownApplied() {
return toBuilder().setProjectionPushdownApplied(true).build();
}

@Override
public boolean supportsProjectionPushdown() {
// We can't do projection pushdown when a query is set. The query may project certain fields
Expand All @@ -1643,7 +1665,7 @@ public PTransform<PBegin, PCollection<T>> actuateProjectionPushdown(
outputFields.keySet());
ImmutableList<String> fields =
ImmutableList.copyOf(fieldAccessDescriptor.fieldNamesAccessed());
return withSelectedFields(fields);
return withSelectedFields(fields).withProjectionPushdownApplied();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,29 @@ public class BigQueryStorageTableSource<T> extends BigQueryStorageSourceBase<T>

private static final Logger LOG = LoggerFactory.getLogger(BigQueryStorageTableSource.class);

private final ValueProvider<TableReference> tableReferenceProvider;
private final boolean projectionPushdownApplied;

private transient AtomicReference<Table> cachedTable;

public static <T> BigQueryStorageTableSource<T> create(
ValueProvider<TableReference> tableRefProvider,
DataFormat format,
@Nullable ValueProvider<List<String>> selectedFields,
@Nullable ValueProvider<String> rowRestriction,
SerializableFunction<SchemaAndRecord, T> parseFn,
Coder<T> outputCoder,
BigQueryServices bqServices) {
BigQueryServices bqServices,
boolean projectionPushdownApplied) {
return new BigQueryStorageTableSource<>(
tableRefProvider, format, selectedFields, rowRestriction, parseFn, outputCoder, bqServices);
tableRefProvider,
format,
selectedFields,
rowRestriction,
parseFn,
outputCoder,
bqServices,
projectionPushdownApplied);
}

public static <T> BigQueryStorageTableSource<T> create(
Expand All @@ -65,23 +78,28 @@ public static <T> BigQueryStorageTableSource<T> create(
Coder<T> outputCoder,
BigQueryServices bqServices) {
return new BigQueryStorageTableSource<>(
tableRefProvider, null, selectedFields, rowRestriction, parseFn, outputCoder, bqServices);
tableRefProvider,
null,
selectedFields,
rowRestriction,
parseFn,
outputCoder,
bqServices,
false);
}

private final ValueProvider<TableReference> tableReferenceProvider;

private transient AtomicReference<Table> cachedTable;

private BigQueryStorageTableSource(
ValueProvider<TableReference> tableRefProvider,
DataFormat format,
@Nullable ValueProvider<List<String>> selectedFields,
@Nullable ValueProvider<String> rowRestriction,
SerializableFunction<SchemaAndRecord, T> parseFn,
Coder<T> outputCoder,
BigQueryServices bqServices) {
BigQueryServices bqServices,
boolean projectionPushdownApplied) {
super(format, selectedFields, rowRestriction, parseFn, outputCoder, bqServices);
this.tableReferenceProvider = checkNotNull(tableRefProvider, "tableRefProvider");
this.projectionPushdownApplied = projectionPushdownApplied;
cachedTable = new AtomicReference<>();
}

Expand All @@ -93,9 +111,21 @@ private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOE
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.addIfNotNull(
DisplayData.item("table", BigQueryHelpers.displayTable(tableReferenceProvider))
.withLabel("Table"));
builder
.addIfNotNull(
DisplayData.item("table", BigQueryHelpers.displayTable(tableReferenceProvider))
.withLabel("Table"))
.addIfNotDefault(
DisplayData.item("projectionPushdownApplied", projectionPushdownApplied)
.withLabel("Projection Pushdown Applied"),
false);

if (selectedFieldsProvider != null && selectedFieldsProvider.isAccessible()) {
builder.add(
DisplayData.item("selectedFields", String.join(", ", selectedFieldsProvider.get()))
.withLabel("Selected Fields"));
}

// Note: This transform does not set launchesBigQueryJobs because it doesn't launch
// BigQuery jobs, but instead uses the storage api to directly read the table.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,13 @@ public void testDisplayData() {
BigQueryIO.read(new TableRowParser())
.withCoder(TableRowJsonCoder.of())
.withMethod(Method.DIRECT_READ)
.withSelectedFields(ImmutableList.of("foo", "bar"))
.withProjectionPushdownApplied()
.from(tableSpec);
DisplayData displayData = DisplayData.from(typedRead);
assertThat(displayData, hasDisplayItem("table", tableSpec));
assertThat(displayData, hasDisplayItem("selectedFields", "foo, bar"));
assertThat(displayData, hasDisplayItem("projectionPushdownApplied", true));
}

@Test
Expand Down Expand Up @@ -2097,6 +2101,7 @@ record ->
TypedRead<Row> pushdownRead = (TypedRead<Row>) pushdownT;
assertEquals(Method.DIRECT_READ, pushdownRead.getMethod());
assertThat(pushdownRead.getSelectedFields().get(), Matchers.containsInAnyOrder("foo"));
assertTrue(pushdownRead.getProjectionPushdownApplied());
}

@Test
Expand Down

0 comments on commit 57c8647

Please sign in to comment.