diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 7a1d5ff46fe3..928f4a253c60 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -381,10 +381,7 @@ private WriteResult expandTriggered(PCollection> inpu WriteDisposition.WRITE_APPEND, CreateDisposition.CREATE_NEVER, maxRetryJobs, - ignoreUnknownValues, kmsKey, - rowWriterFactory.getSourceFormat(), - useAvroLogicalTypes, schemaUpdateOptions, dynamicDestinations)) .withSideInputs(sideInputsForUpdateSchema)) @@ -480,10 +477,7 @@ public WriteResult expandUntriggered(PCollection> inp WriteDisposition.WRITE_APPEND, CreateDisposition.CREATE_NEVER, maxRetryJobs, - ignoreUnknownValues, kmsKey, - rowWriterFactory.getSourceFormat(), - useAvroLogicalTypes, schemaUpdateOptions, dynamicDestinations)) .withSideInputs(sideInputsForUpdateSchema)) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java index 4130b09556ed..a7aa864d578f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java @@ -53,17 +53,13 @@ public class UpdateSchemaDestination private final PCollectionView loadJobIdPrefixView; private final ValueProvider loadJobProjectId; private transient @Nullable DatasetService datasetService; - private final int maxRetryJobs; private final @Nullable String kmsKey; - private final String sourceFormat; - private final boolean useAvroLogicalTypes; private @Nullable BigQueryServices.JobService jobService; - private final boolean ignoreUnknownValues; private final Set schemaUpdateOptions; - private BigQueryIO.Write.WriteDisposition writeDisposition; - private BigQueryIO.Write.CreateDisposition createDisposition; - private DynamicDestinations dynamicDestinations; + private final BigQueryIO.Write.WriteDisposition writeDisposition; + private final BigQueryIO.Write.CreateDisposition createDisposition; + private final DynamicDestinations dynamicDestinations; private static class PendingJobData { final BigQueryHelpers.PendingJob retryJob; @@ -80,7 +76,7 @@ public PendingJobData( } } - private List pendingJobs = Lists.newArrayList(); + private final List pendingJobs = Lists.newArrayList(); public UpdateSchemaDestination( BigQueryServices bqServices, @@ -89,20 +85,14 @@ public UpdateSchemaDestination( BigQueryIO.Write.WriteDisposition writeDisposition, BigQueryIO.Write.CreateDisposition createDisposition, int maxRetryJobs, - boolean ignoreUnknownValues, @Nullable String kmsKey, - String sourceFormat, - boolean useAvroLogicalTypes, Set schemaUpdateOptions, DynamicDestinations dynamicDestinations) { this.loadJobProjectId = loadJobProjectId; this.loadJobIdPrefixView = loadJobIdPrefixView; this.bqServices = bqServices; this.maxRetryJobs = maxRetryJobs; - this.ignoreUnknownValues = ignoreUnknownValues; this.kmsKey = kmsKey; - this.sourceFormat = sourceFormat; - this.useAvroLogicalTypes = useAvroLogicalTypes; this.schemaUpdateOptions = schemaUpdateOptions; this.createDisposition = createDisposition; this.writeDisposition = writeDisposition; @@ -217,14 +207,10 @@ private BigQueryHelpers.PendingJob startZeroLoadJob( .setSchema(schema) .setWriteDisposition(writeDisposition.name()) .setCreateDisposition(createDisposition.name()) - .setSourceFormat(sourceFormat) - .setIgnoreUnknownValues(ignoreUnknownValues) - .setUseAvroLogicalTypes(useAvroLogicalTypes); + .setSourceFormat("NEWLINE_DELIMITED_JSON"); if (schemaUpdateOptions != null) { List options = - schemaUpdateOptions.stream() - .map(Enum::name) - .collect(Collectors.toList()); + schemaUpdateOptions.stream().map(Enum::name).collect(Collectors.toList()); loadConfig.setSchemaUpdateOptions(options); } if (!loadConfig @@ -235,7 +221,7 @@ private BigQueryHelpers.PendingJob startZeroLoadJob( .equals(BigQueryIO.Write.WriteDisposition.WRITE_APPEND.toString())) { return null; } - Table destinationTable = null; + final Table destinationTable; try { destinationTable = datasetService.getTable(tableReference); if (destinationTable == null) {