Skip to content

Commit

Permalink
Disable BigQueryStorageStreamSource.splitAtFraction when read api v2 …
Browse files Browse the repository at this point in the history
…used (#30443)
  • Loading branch information
Abacn authored Mar 4, 2024
1 parent 6a03f9b commit fa43f82
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ public static class BigQueryStorageStreamReader<T> extends BoundedSource.Bounded

// Values used for progress reporting.
private boolean splitPossible = true;
private boolean splitAllowed = true;
private double fractionConsumed;
private double progressAtResponseStart;
private double progressAtResponseEnd;
Expand Down Expand Up @@ -199,6 +200,8 @@ private BigQueryStorageStreamReader(
this.parseFn = source.parseFn;
this.storageClient = source.bqServices.getStorageClient(options);
this.tableSchema = fromJsonString(source.jsonTableSchema, TableSchema.class);
// number of stream determined from server side for storage read api v2
this.splitAllowed = !options.getEnableStorageReadApiV2();
this.fractionConsumed = 0d;
this.progressAtResponseStart = 0d;
this.progressAtResponseEnd = 0d;
Expand Down Expand Up @@ -341,7 +344,7 @@ public synchronized BigQueryStorageStreamSource<T> getCurrentSource() {
return null;
}

if (!splitPossible) {
if (!splitPossible || !splitAllowed) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,13 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageStreamSource.BigQueryStorageStreamReader;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.ConversionOptions;
import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices.FakeBigQueryServerStream;
import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
Expand Down Expand Up @@ -767,6 +769,39 @@ public void testStreamSourceSplit() throws Exception {
assertThat(streamSource.split(0, options), containsInAnyOrder(streamSource));
}

@Test
public void testSplitReadStreamAtFraction() throws IOException {

ReadSession readSession =
ReadSession.newBuilder()
.setName("readSession")
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
.build();

ReadRowsRequest expectedRequest =
ReadRowsRequest.newBuilder().setReadStream("readStream").build();
List<ReadRowsResponse> responses = ImmutableList.of();

StorageClient fakeStorageClient = mock(StorageClient.class);
when(fakeStorageClient.readRows(expectedRequest, ""))
.thenReturn(new FakeBigQueryServerStream<>(responses));

BigQueryStorageStreamSource<TableRow> streamSource =
BigQueryStorageStreamSource.create(
readSession,
ReadStream.newBuilder().setName("readStream").build(),
TABLE_SCHEMA,
new TableRowParser(),
TableRowJsonCoder.of(),
new FakeBigQueryServices().withStorageClient(fakeStorageClient));

PipelineOptions options = PipelineOptionsFactory.fromArgs("--enableStorageReadApiV2").create();
BigQueryStorageStreamReader<TableRow> reader = streamSource.createReader(options);
reader.start();
// Beam does not split storage read api v2 stream
assertNull(reader.splitAtFraction(0.5));
}

@Test
public void testReadFromStreamSource() throws Exception {

Expand Down

0 comments on commit fa43f82

Please sign in to comment.