Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: PulsarIO write() complains of missing ClientUrl even when provided with withClientUrl #30152

Open
1 of 16 tasks
ton3r opened this issue Jan 30, 2024 · 1 comment
Open
1 of 16 tasks

Comments

@ton3r
Copy link

ton3r commented Jan 30, 2024

What happened?

I want to write in an apache beam pipeline to a pulsar topic by using

pCollection.apply("Send to Pulsar", PulsarIO.write().withClientUrl(pulsarClientUrl).withTopic(pulsarTopic));

But this throws an 'IllegalStateException: Missing required properties: clientUrl'

During the debug, I see that PulsarIO.write() uses a private AutoValue_PulsarIO_Write.Builder().build() where the build() call checks the clientUrl

PulsarIO.Write build() {
            if (this.clientUrl == null) {
                String missing = " clientUrl";
                throw new IllegalStateException("Missing required properties:" + missing);
            } else {
                return new AutoValue_PulsarIO_Write(this.topic, this.clientUrl);
            }
        }

But I'm only ready to set this 'clientUrl' after the call of 'PulsarIO.write()' which checks the clientUrl

Seems not possible to create a PulsarIO.write()

Issue Priority

Priority: 3 (minor)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@ton3r
Copy link
Author

ton3r commented Feb 1, 2024

I was able to "bend the spoon" by java reflection and set the package private things to accessible, or writing an own CustomPulsarWriter in the package org.apache.beam.sdk.io.pulsar; to extend abstract classes.. But now


Exception in thread "main" java.lang.IllegalArgumentException: org.apache.beam.sdk.io.pulsar.WriteToPulsarDoFn: Non-splittable, but annotated as @UnboundedPerElement
	at org.apache.beam.sdk.transforms.reflect.DoFnSignatures$ErrorReporter.throwIllegalArgument(DoFnSignatures.java:2409)
	at org.apache.beam.sdk.transforms.reflect.DoFnSignatures$ErrorReporter.checkArgument(DoFnSignatures.java:2415)
	at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.inferBoundedness(DoFnSignatures.java:917)
	at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.parseSignature(DoFnSignatures.java:848)
	at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1737)
	at org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getSignature(DoFnSignatures.java:301)
	at org.apache.beam.sdk.transforms.ParDo.validate(ParDo.java:614)
	at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:403)
	at org.apache.beam.sdk.io.pulsar.PulsarIO$Write.expand(PulsarIO.java:180)
	at org.apache.beam.sdk.io.pulsar.PulsarIO$Write.expand(PulsarIO.java:151)
	at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
	at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:482)
	at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:360)
	at de.spx.bucketdataflow.PubSubToGcs.main(PubSubToGcs.java:103)

@riteshghorse riteshghorse changed the title [Bug]: [Bug]: PulsarIO write() complains of missing ClientUrl even when provided with withClientUrl Feb 2, 2024
@riteshghorse riteshghorse added the io label Feb 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants