From ce71cafc4801474ba7791a194200e4599b0b9e33 Mon Sep 17 00:00:00 2001 From: Kyle Weaver Date: Thu, 17 Mar 2022 13:12:35 -0700 Subject: [PATCH] [BEAM-14124] Add display data to BQ storage reads. Add display data for "Selected fields" and "Projection pushdown applied". I also want to add one for "Number of fields pushed down", but that will be a little more involved so I'll do it in a separate PR. --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 26 +++++++++- .../bigquery/BigQueryStorageTableSource.java | 52 +++++++++++++++---- .../bigquery/BigQueryIOStorageReadTest.java | 5 ++ 3 files changed, 70 insertions(+), 13 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 9786d90a272e..c510db78b785 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -590,6 +590,7 @@ public static TypedRead read(SerializableFunction par .setMethod(TypedRead.Method.DEFAULT) .setUseAvroLogicalTypes(false) .setFormat(DataFormat.AVRO) + .setProjectionPushdownApplied(false) .build(); } @@ -805,6 +806,8 @@ abstract static class Builder { abstract Builder setFromBeamRowFn(FromBeamRowFunction fromRowFn); abstract Builder setUseAvroLogicalTypes(Boolean useAvroLogicalTypes); + + abstract Builder setProjectionPushdownApplied(boolean projectionPushdownApplied); } abstract @Nullable ValueProvider getJsonTableRef(); @@ -853,6 +856,8 @@ abstract static class Builder { abstract Boolean getUseAvroLogicalTypes(); + abstract boolean getProjectionPushdownApplied(); + /** * An enumeration type for the priority of a query. * @@ -1229,7 +1234,8 @@ private PCollection expandForDirectRead(PBegin input, Coder outputCoder) { getRowRestriction(), getParseFn(), outputCoder, - getBigQueryServices()))); + getBigQueryServices(), + getProjectionPushdownApplied()))); } checkArgument( @@ -1430,6 +1436,10 @@ public void populateDisplayData(DisplayData.Builder builder) { DisplayData.item("table", BigQueryHelpers.displayTable(getTableProvider())) .withLabel("Table")) .addIfNotNull(DisplayData.item("query", getQuery()).withLabel("Query")) + .addIfNotDefault( + DisplayData.item("projectionPushdownApplied", getProjectionPushdownApplied()) + .withLabel("Projection Pushdown Applied"), + false) .addIfNotNull( DisplayData.item("flattenResults", getFlattenResults()) .withLabel("Flatten Query Results")) @@ -1438,6 +1448,13 @@ public void populateDisplayData(DisplayData.Builder builder) { .withLabel("Use Legacy SQL Dialect")) .addIfNotDefault( DisplayData.item("validation", getValidate()).withLabel("Validation Enabled"), true); + + ValueProvider> selectedFieldsProvider = getSelectedFields(); + if (selectedFieldsProvider != null && selectedFieldsProvider.isAccessible()) { + builder.add( + DisplayData.item("selectedFields", String.join(", ", selectedFieldsProvider.get())) + .withLabel("Selected Fields")); + } } /** Ensures that methods of the from() / fromQuery() family are called at most once. */ @@ -1623,6 +1640,11 @@ public TypedRead useAvroLogicalTypes() { return toBuilder().setUseAvroLogicalTypes(true).build(); } + @VisibleForTesting + TypedRead withProjectionPushdownApplied() { + return toBuilder().setProjectionPushdownApplied(true).build(); + } + @Override public boolean supportsProjectionPushdown() { // We can't do projection pushdown when a query is set. The query may project certain fields @@ -1643,7 +1665,7 @@ public PTransform> actuateProjectionPushdown( outputFields.keySet()); ImmutableList fields = ImmutableList.copyOf(fieldAccessDescriptor.fieldNamesAccessed()); - return withSelectedFields(fields); + return withSelectedFields(fields).withProjectionPushdownApplied(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java index 2a14cd50984f..c53cab3d2c13 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java @@ -45,6 +45,11 @@ public class BigQueryStorageTableSource extends BigQueryStorageSourceBase private static final Logger LOG = LoggerFactory.getLogger(BigQueryStorageTableSource.class); + private final ValueProvider tableReferenceProvider; + private final boolean projectionPushdownApplied; + + private transient AtomicReference cachedTable; + public static BigQueryStorageTableSource create( ValueProvider tableRefProvider, DataFormat format, @@ -52,9 +57,17 @@ public static BigQueryStorageTableSource create( @Nullable ValueProvider rowRestriction, SerializableFunction parseFn, Coder outputCoder, - BigQueryServices bqServices) { + BigQueryServices bqServices, + boolean projectionPushdownApplied) { return new BigQueryStorageTableSource<>( - tableRefProvider, format, selectedFields, rowRestriction, parseFn, outputCoder, bqServices); + tableRefProvider, + format, + selectedFields, + rowRestriction, + parseFn, + outputCoder, + bqServices, + projectionPushdownApplied); } public static BigQueryStorageTableSource create( @@ -65,13 +78,16 @@ public static BigQueryStorageTableSource create( Coder outputCoder, BigQueryServices bqServices) { return new BigQueryStorageTableSource<>( - tableRefProvider, null, selectedFields, rowRestriction, parseFn, outputCoder, bqServices); + tableRefProvider, + null, + selectedFields, + rowRestriction, + parseFn, + outputCoder, + bqServices, + false); } - private final ValueProvider tableReferenceProvider; - - private transient AtomicReference
cachedTable; - private BigQueryStorageTableSource( ValueProvider tableRefProvider, DataFormat format, @@ -79,9 +95,11 @@ private BigQueryStorageTableSource( @Nullable ValueProvider rowRestriction, SerializableFunction parseFn, Coder outputCoder, - BigQueryServices bqServices) { + BigQueryServices bqServices, + boolean projectionPushdownApplied) { super(format, selectedFields, rowRestriction, parseFn, outputCoder, bqServices); this.tableReferenceProvider = checkNotNull(tableRefProvider, "tableRefProvider"); + this.projectionPushdownApplied = projectionPushdownApplied; cachedTable = new AtomicReference<>(); } @@ -93,9 +111,21 @@ private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOE @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - builder.addIfNotNull( - DisplayData.item("table", BigQueryHelpers.displayTable(tableReferenceProvider)) - .withLabel("Table")); + builder + .addIfNotNull( + DisplayData.item("table", BigQueryHelpers.displayTable(tableReferenceProvider)) + .withLabel("Table")) + .addIfNotDefault( + DisplayData.item("projectionPushdownApplied", projectionPushdownApplied) + .withLabel("Projection Pushdown Applied"), + false); + + if (selectedFieldsProvider != null && selectedFieldsProvider.isAccessible()) { + builder.add( + DisplayData.item("selectedFields", String.join(", ", selectedFieldsProvider.get())) + .withLabel("Selected Fields")); + } + // Note: This transform does not set launchesBigQueryJobs because it doesn't launch // BigQuery jobs, but instead uses the storage api to directly read the table. } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java index 8f8ab867a910..e9a98a523591 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java @@ -279,9 +279,13 @@ public void testDisplayData() { BigQueryIO.read(new TableRowParser()) .withCoder(TableRowJsonCoder.of()) .withMethod(Method.DIRECT_READ) + .withSelectedFields(ImmutableList.of("foo", "bar")) + .withProjectionPushdownApplied() .from(tableSpec); DisplayData displayData = DisplayData.from(typedRead); assertThat(displayData, hasDisplayItem("table", tableSpec)); + assertThat(displayData, hasDisplayItem("selectedFields", "foo, bar")); + assertThat(displayData, hasDisplayItem("projectionPushdownApplied", true)); } @Test @@ -2097,6 +2101,7 @@ record -> TypedRead pushdownRead = (TypedRead) pushdownT; assertEquals(Method.DIRECT_READ, pushdownRead.getMethod()); assertThat(pushdownRead.getSelectedFields().get(), Matchers.containsInAnyOrder("foo")); + assertTrue(pushdownRead.getProjectionPushdownApplied()); } @Test