Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: ClassCastException when using custom DynamicDestination in BigQueryIO.Write #22543

Closed
dartiga opened this issue Jul 30, 2022 · 15 comments · Fixed by #22624
Closed

[Bug]: ClassCastException when using custom DynamicDestination in BigQueryIO.Write #22543

dartiga opened this issue Jul 30, 2022 · 15 comments · Fixed by #22624

Comments

@dartiga
Copy link

dartiga commented Jul 30, 2022

What happened?

Beam: 2.40

While using custom DynamicDestination in BigQueryIO.Write got the following exception:

java.lang.ClassCastException: org.apache.beam.sdk.io.gcp.bigquery.TableDestination cannot be cast to java.lang.String com.king.da.destinations.KingAppDestinations.getTable(KingAppDestinations.java:17)
 org.apache.beam.sdk.io.gcp.bigquery.UpdateSchemaDestination.processElement(UpdateSchemaDestination.java:131)
   org.apache.beam.sdk.io.gcp.bigquery.UpdateSchemaDestination$DoFnInvoker.invokeProcessElement(Unknown Source)

Find below test case to reproduce:

import java.io.IOException;
import java.security.SecureRandom;
import java.util.Arrays;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.TestBigQueryOptions;
import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;

@RunWith(JUnit4.class)
public class UpdateSchemaDestinationTest {

    static final BigqueryClient BQ_CLIENT = new BigqueryClient(UpdateSchemaDestinationTest.class.getName());

    static final String DATASET_ID =
            "schema_update_options_class_cast_excepption"
                + System.currentTimeMillis()
                + "_"
                + new SecureRandom().nextInt(32);

    static TestBigQueryOptions options = TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class);

    static Pipeline pipeline;

    @BeforeClass
    public static void setUpAll() throws IOException, InterruptedException {
        options.setTempLocation(options.getTempRoot() + "/bq_it_temp");

        pipeline = Pipeline.create(options);

        BQ_CLIENT.createNewDataset(options.getProject(), DATASET_ID);
    }

    @AfterClass
    public static void tearDownAll() {
        BQ_CLIENT.deleteDataset(options.getProject(), DATASET_ID);
    }

    @Test
    public void classCastExceptionRegression() {
        DynamicDestinations<KV<String, String>, String> destinations = new SomeDynamicDestinations();

        PCollection<KV<String, String>> rows = pipeline
            .apply(Create.of(
                KV.of("table","foo"),
                KV.of("table","bar")
            ));

        rows.apply(BigQueryIO.<KV<String, String>>write()
            .withFormatFunction(kv -> new TableRow().set("name", kv.getValue()))
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMaxBytesPerPartition(1)
            .to(destinations));

        pipeline.run().waitUntilFinish();
    }

    private static final class SomeDynamicDestinations extends DynamicDestinations<KV<String, String>, String> {

        private static final long serialVersionUID = 1L;

        @Override
        public String getDestination(@Nullable ValueInSingleWindow<KV<String, String>> element) {
            return element.getValue().getKey();
        }

        @Override
        public TableDestination getTable(String destination) {
            return new TableDestination(DATASET_ID + "." + destination, "a table");
        }

        @Override
        public @Nullable TableSchema getSchema(String destination) {
            return new TableSchema().setFields(Arrays.asList(new TableFieldSchema().setName("name").setType("STRING")));
        }
    }
}

Issue Priority

Priority: 1

Issue Component

Component: io-java-gcp

@johnjcasey
Copy link
Contributor

I am not able to replicate this exception with the code you provided.

I suspect the issue is that your dynamicDestinations expects a String argument to the getTable method, but a TableDestination is being passed instead, causing the casting exception

@johnjcasey
Copy link
Contributor

Can you update your replication code to make this reproducible?

@dartiga
Copy link
Author

dartiga commented Aug 4, 2022

I am not able to replicate this exception with the code you provided.

Are you getting any error or just green test? I am able to replicate the issue using the code I posted:

org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.ClassCastException: org.apache.beam.sdk.io.gcp.bigquery.TableDestination cannot be cast to java.lang.String
	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
	at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
	at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
	at com.king.da.UpdateSchemaDestinationTest.classCastExceptionRegression(UpdateSchemaDestinationTest.java:78)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
	at org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
	at org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
	at org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
	at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
	at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:95)
	at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:91)
	at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:60)
	at org.eclipse.jdt.internal.junit5.runner.JUnit5TestReference.run(JUnit5TestReference.java:98)
	at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:40)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:529)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:756)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:452)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:210)
Caused by: java.lang.ClassCastException: org.apache.beam.sdk.io.gcp.bigquery.TableDestination cannot be cast to java.lang.String
	at com.king.da.UpdateSchemaDestinationTest$SomeDynamicDestinations.getTable(UpdateSchemaDestinationTest.java:1)
	at org.apache.beam.sdk.io.gcp.bigquery.UpdateSchemaDestination.processElement(UpdateSchemaDestination.java:131)

@dartiga
Copy link
Author

dartiga commented Aug 4, 2022

Updated code with missing imports just in case.

@johnjcasey
Copy link
Contributor

I'm running into a separate issue

com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request
POST https://bigquery.googleapis.com/bigquery/v2/projects/google.com:clouddfe/jobs?prettyPrint=false
{
"code" : 400,
"errors" : [ {
"domain" : "global",
"message" : "Invalid path: /Users/johnjcasey/vc/beam-java/sdks/java/io/google-cloud-platform/null/bq_it_temp/BigQueryWriteTemp/beam_bq_job_LOAD_testpipelinejohnjcasey0804135744c736b253_9d89ed77264a49a3a5d5d94dca3004c4/b7a2b0b3-859e-4869-957f-a41375fb5d96",
"reason" : "invalid"
} ],
"message" : "Invalid path: /Users/johnjcasey/vc/beam-java/sdks/java/io/google-cloud-platform/null/bq_it_temp/BigQueryWriteTemp/beam_bq_job_LOAD_testpipelinejohnjcasey0804135744c736b253_9d89ed77264a49a3a5d5d94dca3004c4/b7a2b0b3-859e-4869-957f-a41375fb5d96",
"status" : "INVALID_ARGUMENT"
}

which would appear to be after any destination resolution

@dartiga
Copy link
Author

dartiga commented Aug 4, 2022

That sounds like your tempLocation is not valid. Let me see if I can reproduce your error and update the code.

@dartiga
Copy link
Author

dartiga commented Aug 4, 2022

In order to fix your issue you might need to provide a valid GS bucket (where you should have permissions for writing) when setting tempLocation option.

So either you replace

options.setTempLocation(options.getTempRoot() + "/bq_it_temp");

with something like:

options.setTempLocation("gs://<bucket>/bq_it_temp");

Or you run the test specifying --tempRoot=gs://<bucket> in the command line.

@dartiga
Copy link
Author

dartiga commented Aug 4, 2022

Beam ITs follow the same approach

"--tempRoot=${gcpTempRoot}",

@johnjcasey
Copy link
Contributor

I'm replicating your issue now and the issue is what I'm seeing is what I thought.

The destination that is being passed in is already a TableDestination, which causing the class cast exception

@johnjcasey
Copy link
Contributor

@ahmedabu98 this is the BQ issue.

@ahmedabu98
Copy link
Contributor

The write behaves pretty strangely. Here's what I found:

The values do end up being written into BQ but a new table is created for each value:
image

This is for FILE_LOADS. I'm not observing the same behavior when switching the write method to STREAMING_INSERTS. Currently trying with STORAGE_WRITE_API though I'm running into separate issues there.

@dartiga
Copy link
Author

dartiga commented Aug 4, 2022

That's because UpdateSchemaDestination is used in BatchLoads only. It was introduced in 2.40.

As @johnjcasey mentioned, our custom DynamicDestinations expects String to be received, while UpdateSchemaDestination assumes TableDestination instances always.

These two will fail for sure.

@ahmedabu98
Copy link
Contributor

ahmedabu98 commented Aug 5, 2022

Looks like the user-specified DestinationT type is first replaced with TableDestination during the write here (DestinationT in, TableDestination out).

The WriteTables DoFn returns TableDestination types instead of DestinationT, which is carried downstream to UpdateSchemaDestinations.

@ahmedabu98
Copy link
Contributor

.take-issue

@ahmedabu98
Copy link
Contributor

With the changes in #22624 I was able to write successfully with the following pipeline:

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.checkerframework.checker.nullness.qual.Nullable;
 
public class UpdateSchemaDestinationWrite {
 
  static final String PROJECT_AND_DATASET_ID = "<myproject>:<mydataset>.";
 
  private static final class SomeDynamicDestinations
      extends DynamicDestinations<KV<String, String>, String> {
 
    private static final long serialVersionUID = 1L;
 
    @Override
    public String getDestination(@Nullable ValueInSingleWindow<KV<String, String>> element) {
      return element.getValue().getKey();
    }
 
    @Override
    public TableDestination getTable(String destination) {
      return new TableDestination(PROJECT_AND_DATASET_ID + destination, "a table");
    }
 
    @Override
    public @Nullable TableSchema getSchema(String destination) {
      return new TableSchema()
          .setFields(Arrays.asList(new TableFieldSchema().setName("name").setType("STRING")));
    }
  }
 
  public static void main(String args[]) {
 
    BigQueryOptions bqOptions = PipelineOptionsFactory.fromArgs(args).as(BigQueryOptions.class);
    Pipeline pipeline = Pipeline.create(bqOptions);
 
    DynamicDestinations<KV<String, String>, String> destinations = new SomeDynamicDestinations();
 
    PCollection<KV<String, String>> rows =
        pipeline.apply(Create.of(KV.of("my_table", "hi"), KV.of("my_table", "hello")));
 
    rows.apply(
        BigQueryIO.<KV<String, String>>write()
            .withFormatFunction(kv -> new TableRow().set("name", kv.getValue()))
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMaxBytesPerPartition(1)
            .to(destinations));
 
    pipeline.run().waitUntilFinish();
  }
}

johnjcasey pushed a commit that referenced this issue Aug 26, 2022
…) fixing #22543

* keeping hold of user specified dynamic destination type to be able to use it in UpdateSchemaDestinations

* fix for testWriteTables

* cleanup and support default project when not included in table ref

* allow side inputs called from getTable()

* style fixes
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants