Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request][YAML]: KafkaIO for YAML #28664

Closed
1 of 15 tasks
brucearctor opened this issue Sep 26, 2023 · 17 comments
Closed
1 of 15 tasks

[Feature Request][YAML]: KafkaIO for YAML #28664

brucearctor opened this issue Sep 26, 2023 · 17 comments

Comments

@brucearctor
Copy link
Contributor

What would you like to happen?

We'd like Beam YAML to support Kafka :-)

Issue Priority

Priority: 2 (default / most feature requests should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@brucearctor
Copy link
Contributor Author

@robertwb -- Jeff mentioned you might be working on this. Is that true? Otherwise, Ferran [ @ffernandez92 ] can take that on.

@robertwb
Copy link
Contributor

I've thought about this some, but haven't actually started any work here. It'd be great if Ferran takes this on.

It'd probably make sense to follow the same pattern as the pubsub one (namely, taking a "format" and "schema" parameter to convert to and from Row objects). (I don't know if kafka messages have the equivalent of PubSub attributes.)
JsonUtils.{beamSchemaFromJsonSchema,getJsonBytesToRowFunction} and AvroUtils.{toBeamSchema,getAvroBytesToRowFunction}.

Also, in this case there's no Python implementation, so no need for a Python variant. Just do it all in Java.

@brucearctor
Copy link
Contributor Author

brucearctor commented Sep 26, 2023

Assigning myself for now. I was unable to assign @ffernandez92 . Is there special permissions needed for that?

FYI --> We still have some things this week to address, AND, needing to get machine started to be able to run/test things, so might be greater than a week. @anyone message here if any questions.

@robertwb
Copy link
Contributor

Not sure what the permissions requirements are, looks like the assignee needs to comment on the issue first? https://github.com/apache/beam/blob/master/CONTRIBUTING.md#share-your-intent . In any case, the intent is clear, looking forward to this contribution.

This was referenced Sep 27, 2023
@ffernandez92
Copy link
Contributor

I've been pondering how to approach this issue. Initially, we can think about utilizing the "raw" type, where it would consist solely of the Row object with the payload represented as bytes. As for the schema, for the time being, we can reuse the _create_parser function. If we decide to enhance it later on, we might consider supporting additional types like avro or proto. Nevertheless, I believe it's best to keep it straightforward and allow the user to handle such complexity downstream through custom transformations, possibly in Java or Python. Any opinions here?

@robertwb
Copy link
Contributor

robertwb commented Oct 2, 2023

Supporting the "raw" types as bytes certainly is the most flexible and makes sense as a first pass. I do think we'll want to support json and avro (at least) similar to the _create_parser idea we have for PubSub (I extended that for JSON at #28754 ), though for KafkaIO this'd be in Java of course.

I just noticed that we already have https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java which is actually pretty complete. We'd probably want to add an option for the "raw" format that would pass the bytes through directly, as well as an option to add the key in if desired (similar to how the attributes can be appended as extra fields for PubSub).

@robertwb
Copy link
Contributor

robertwb commented Oct 2, 2023

And we'd also want to add error handling capabilities.

@brucearctor
Copy link
Contributor Author

FYI --> We're most interested in protobuf and json [ since we don't currently use avro ].

@robertwb
Copy link
Contributor

robertwb commented Oct 2, 2023

+1, protobuf support would be great to have too.

@ffernandez92
Copy link
Contributor

Thanks for the KafkaProvider @robertwb . I ran a quick test adding the necessary elements to standard_io.yaml

  transforms:
(...)
    'ReadFromKafka': 'ReadFromKafka'

      'ReadFromKafka':
        'schema': 'schema'
        'hash_code': 'hashCode'
        'consumer_config': 'consumerConfigUpdates'
        'format': 'format'
        'topic': 'topic'
        'bootstrap_servers': 'bootstrapServers'
        'confluent_schema_registry_url': 'confluentSchemaRegistryUrl'
(...)

        'ReadFromKafka': 'beam:schematransform:org.apache.beam:kafka_read:v1'

As a test. I ran the following yaml:

pipeline:
  transforms:
    - type: ReadFromKafka
      config:
        topic: test_topic
        format: JSON
        hash_code: 1
        bootstrap_servers: "localhost:9092"
        schema: '{ "type": "record", "name": "AdEvent", "fields": [{"name": "media_type", "type": "string"},{"name": "ad_type", "type": "int"}]}'

But it shows the following error:

Building pipeline...
INFO:apache_beam.yaml.yaml_transform:Expanding "ReadFromKafka" at line 3 
Traceback (most recent call last):
  File "/usr/lib/python3.8/runpy.py", line 194, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib/python3.8/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/main.py", line 75, in <module>
    run()
  File "/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/main.py", line 68, in run
    yaml_transform.expand_pipeline(p, pipeline_spec)
  File "/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py", line 970, in expand_pipeline
    return YamlTransform(
  File "/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py", line 944, in expand
    result = expand_transform(
  File "/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py", line 424, in expand_transform
    return expand_composite_transform(spec, scope)
  File "/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py", line 498, in expand_composite_transform
    return CompositePTransform.expand(None)
  File "/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py", line 489, in expand
    inner_scope.compute_all()
  File "/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py", line 218, in compute_all
    self.compute_outputs(transform_id)
  File "/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py", line 68, in wrapper
    self._cache[key] = func(self, *args)
  File "/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py", line 252, in compute_outputs
    return expand_transform(self._transforms_by_uuid[transform_id], self)
  File "/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py", line 426, in expand_transform
    return expand_leaf_transform(spec, scope)
  File "/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py", line 448, in expand_leaf_transform
    ptransform = scope.create_ptransform(spec, inputs_dict.values())
  File "/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py", line 351, in create_ptransform
    provider.requires_inputs(spec['type'], config)):
  File "/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_provider.py", line 719, in requires_inputs
    return self._underlying_provider.requires_inputs(typ, args)
  File "/home/ferranfernandez/beam/sdks/python/apache_beam/yaml/yaml_provider.py", line 172, in requires_inputs
    if self._urns[type] in self.schema_transforms():
KeyError: <class 'type'>

Most likely I'm missing something. When I registered the transform as "full python" under ReadFromKafka': 'apache_beam.yaml.yaml_io.read_from_kafka' it worked fine although it was limited to raw only. The only reason why I tried this is because I wanted to explore to expand the Java option [which I think is the right way to go]

@robertwb
Copy link
Contributor

robertwb commented Oct 2, 2023

Yes, Java is the right way to go. Looks like you hit #28775 (the java <-> python side is still not as fully vetted/tested).

@ffernandez92
Copy link
Contributor

Thanks! It worked. Most likely I'll create a different provider since I don't think this one covers everything we need. Besides that, I dropped a comment here: #28775 . I had to fix line 173 as well

@robertwb robertwb mentioned this issue Oct 2, 2023
3 tasks
@robertwb
Copy link
Contributor

robertwb commented Oct 2, 2023

What is the motivation to provide a different provider vs. extend this one? (These are not really used yet, but were prototyped in anticipation of something like YAML).

@ffernandez92
Copy link
Contributor

Oh, I see! I thought it might have been utilized elsewhere, so I was concerned that making an extension or modification that could potentially impact backward compatibility could pose an issue. However, I think extending is fine.

@brucearctor
Copy link
Contributor Author

Oh, I see! I thought it might have been utilized elsewhere, so I was concerned that making an extension or modification that could potentially impact backward compatibility could pose an issue. However, I think extending is fine.

Hoping that we wind up with tests in place so we can confidently modify/extend without worry. :-)

@ffernandez92
Copy link
Contributor

I was trying to run the previous code in Dataflow but i'm facing some issues, maybe you guys can help.

1 - I created the expansion service jar: ./gradlew :sdks:java:io:expansion-service:build . As well as the Python SDK and the Java docker image.

2 - Ran the following command:

python3 -m apache_beam.yaml.main --runner=DataflowRunner --project={project} --region=us-east4 --temp_location=gs://{gcs_temp_loc}/temp/ --pipeline_spec_file=/home/ferran_fernandez/read_k5.yml --staging_location=gs://{gcs_staging_loc}/staging --sdk_location="/home/ferran_fernandez/beam/sdks/python/dist/apache-beam-2.52.0.dev0.tar.gz" --sdk_harness_container_image_overrides=".*java.*,us-east4-docker.pkg.dev/{project}/beam/beam_java8_sdk:latest" --streaming

I got the following error:

INFO:root:Starting a JAR-based expansion service from JAR /home/ferran_fernandez/beam/sdks/java/io/expansion-service/build/libs/beam-sdks-java-io-expansion-service-2.52.0-SNAPSHOT.jar 
INFO:apache_beam.utils.subprocess_server:Starting service with ['java' '-jar' '/home/ferran_fernandez/beam/sdks/java/io/expansion-service/build/libs/beam-sdks-java-io-expansion-service-2.52.0-SNAPSHOT.jar' '53199' '--filesToStage=/home/ferran_fernandez/beam/sdks/java/io/expansion-service/build/libs/beam-sdks-java-io-expansion-service-2.52.0-SNAPSHOT.jar']
INFO:apache_beam.utils.subprocess_server:Starting expansion service at localhost:53199
INFO:apache_beam.utils.subprocess_server:Oct 16, 2023 10:16:36 AM org.apache.beam.sdk.expansion.service.ExpansionService loadRegisteredTransforms
INFO:apache_beam.utils.subprocess_server:INFO: Registering external transforms: [beam:transform:org.apache.beam:kafka_read_with_metadata:v1, beam:transform:org.apache.beam:kafka_read_without_metadata:v1, beam:transform:org.apache.beam:kafka_write:v1, beam:external:java:generate_sequence:v1, beam:transform:combine_globally:v1, beam:transform:combine_grouped_values:v1, beam:transform:combine_per_key:v1, beam:transform:create_view:v1, beam:transform:flatten:v1, beam:transform:group_by_key:v1, beam:transform:group_into_batches:v1, beam:transform:group_into_batches_with_sharded_key:v1, beam:transform:impulse:v1, beam:transform:reshuffle:v1, beam:transform:sdf_process_keyed_elements:v1, beam:transform:teststream:v1, beam:transform:window_into:v1, beam:transform:write_files:v1]
INFO:apache_beam.utils.subprocess_server:
INFO:apache_beam.utils.subprocess_server:Registered transforms:
INFO:apache_beam.utils.subprocess_server:       beam:transform:org.apache.beam:kafka_read_with_metadata:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@7c469c48
INFO:apache_beam.utils.subprocess_server:       beam:transform:org.apache.beam:kafka_read_without_metadata:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@12e61fe6
INFO:apache_beam.utils.subprocess_server:       beam:transform:org.apache.beam:kafka_write:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@7ee955a8
INFO:apache_beam.utils.subprocess_server:       beam:external:java:generate_sequence:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@1677d1
INFO:apache_beam.utils.subprocess_server:       beam:transform:combine_globally:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@48fa0f47
INFO:apache_beam.utils.subprocess_server:       beam:transform:combine_grouped_values:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@6ac13091
INFO:apache_beam.utils.subprocess_server:       beam:transform:combine_per_key:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@5e316c74
INFO:apache_beam.utils.subprocess_server:       beam:transform:create_view:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@6d2a209c
INFO:apache_beam.utils.subprocess_server:       beam:transform:flatten:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@75329a49
INFO:apache_beam.utils.subprocess_server:       beam:transform:group_by_key:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@161479c6
INFO:apache_beam.utils.subprocess_server:       beam:transform:group_into_batches:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@4313f5bc
INFO:apache_beam.utils.subprocess_server:       beam:transform:group_into_batches_with_sharded_key:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@7f010382
INFO:apache_beam.utils.subprocess_server:       beam:transform:impulse:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@1e802ef9
INFO:apache_beam.utils.subprocess_server:       beam:transform:reshuffle:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@2b6faea6
INFO:apache_beam.utils.subprocess_server:       beam:transform:sdf_process_keyed_elements:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@778d1062
INFO:apache_beam.utils.subprocess_server:       beam:transform:teststream:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@670002
INFO:apache_beam.utils.subprocess_server:       beam:transform:window_into:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@1f0f1111
INFO:apache_beam.utils.subprocess_server:       beam:transform:write_files:v1: org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$55/226744878@49c386c8
INFO:apache_beam.utils.subprocess_server:
INFO:apache_beam.utils.subprocess_server:Registered SchemaTransformProviders:
INFO:apache_beam.utils.subprocess_server:       beam:schematransform:org.apache.beam:kafka_read:v1
INFO:apache_beam.utils.subprocess_server:       beam:schematransform:org.apache.beam:kafka_write:v1
INFO:apache_beam.utils.subprocess_server:Oct 16, 2023 10:16:37 AM org.apache.beam.sdk.expansion.service.ExpansionService expand
INFO:apache_beam.utils.subprocess_server:INFO: Expanding 'ReadFromKafka/beam:schematransform:org.apache.beam:kafka_read:v1' with URN 'beam:expansion:payload:schematransform:v1'
INFO:apache_beam.utils.subprocess_server:Dependencies list: {}
Traceback (most recent call last):
  File "/home/ferran_fernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py", line 452, in expand_leaf_transform
    outputs = inputs | scope.unique_name(spec, ptransform) >> ptransform
  File "/home/ferran_fernandez/beam/sdks/python/apache_beam/pvalue.py", line 137, in __or__
    return self.pipeline.apply(ptransform, self)
  File "/home/ferran_fernandez/beam/sdks/python/apache_beam/pipeline.py", line 651, in apply
    return self.apply(
  File "/home/ferran_fernandez/beam/sdks/python/apache_beam/pipeline.py", line 662, in apply
    return self.apply(transform, pvalueish)
  File "/home/ferran_fernandez/beam/sdks/python/apache_beam/pipeline.py", line 708, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
  File "/home/ferran_fernandez/beam/sdks/python/apache_beam/runners/runner.py", line 203, in apply
    return self.apply_PTransform(transform, input, options)
  File "/home/ferran_fernandez/beam/sdks/python/apache_beam/runners/runner.py", line 207, in apply_PTransform
    return transform.expand(input)
  File "/home/ferran_fernandez/beam/sdks/python/apache_beam/yaml/yaml_transform.py", line 370, in recording_expand
    result = original_expand(pvalue)
  File "/home/ferran_fernandez/beam/sdks/python/apache_beam/transforms/external.py", line 425, in expand
    return pcolls | self._payload_builder.identifier() >> ExternalTransform(
  File "/home/ferran_fernandez/beam/sdks/python/apache_beam/pvalue.py", line 137, in __or__
    return self.pipeline.apply(ptransform, self)
  File "/home/ferran_fernandez/beam/sdks/python/apache_beam/pipeline.py", line 651, in apply
    return self.apply(
  File "/home/ferran_fernandez/beam/sdks/python/apache_beam/pipeline.py", line 662, in apply
    return self.apply(transform, pvalueish)
  File "/home/ferran_fernandez/beam/sdks/python/apache_beam/pipeline.py", line 708, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
  File "/home/ferran_fernandez/beam/sdks/python/apache_beam/runners/runner.py", line 203, in apply
    return self.apply_PTransform(transform, input, options)
  File "/home/ferran_fernandez/beam/sdks/python/apache_beam/runners/runner.py", line 207, in apply_PTransform
    return transform.expand(input)
  File "/home/ferran_fernandez/beam/sdks/python/apache_beam/transforms/external.py", line 730, in expand
    raise RuntimeError(response.error)
RuntimeError: java.lang.IllegalStateException: Missing required properties: topicPattern keyCoder valueCoder watermarkFn maxReadTime startReadTime stopReadTime watchTopicPartitionDuration offsetConsumerConfig keyDeserializerProvider valueDeserializerProvider checkStopReadingFn
        at org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Read$Builder.build(AutoValue_KafkaIO_Read.java:585)
        at org.apache.beam.sdk.io.kafka.KafkaIO.read(KafkaIO.java:574)
        at org.apache.beam.sdk.io.kafka.KafkaIO.readBytes(KafkaIO.java:553)
        at org.apache.beam.sdk.io.kafka.KafkaReadSchemaTransformProvider$3.expand(KafkaReadSchemaTransformProvider.java:131)
        at org.apache.beam.sdk.io.kafka.KafkaReadSchemaTransformProvider$3.expand(KafkaReadSchemaTransformProvider.java:127)
        at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:545)
        at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:496)
        at org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:467)
        at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:620)
        at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:704)
        at org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:306)
        at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
        at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:355)
        at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:867)
        at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

I tried to rollback my code and use the original code but I got the same result. My expectation here is that I'm not building this in the right way (most likely I think it has to do with the autovalues but I'm not sure).

Is there any gradlew command I should be using?

@ffernandez92
Copy link
Contributor

ffernandez92 commented Oct 16, 2023

Disregard this last comment... I forgot to add the kafka client while building the expansion service... It works as expected now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants