Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request]: Python-based Unbounded WebSocker IO Connector #33229

Open
2 of 17 tasks
CWrecker opened this issue Nov 26, 2024 · 7 comments
Open
2 of 17 tasks

[Feature Request]: Python-based Unbounded WebSocker IO Connector #33229

CWrecker opened this issue Nov 26, 2024 · 7 comments

Comments

@CWrecker
Copy link

CWrecker commented Nov 26, 2024

What would you like to happen?

Apache Beam lacks a native Python-based IO connector that can ingest data directly from a socket. This feature would enable users to easily integrate streaming data sources, such as those emitting messages over TCP/IP sockets, into their Apache Beam pipelines.

Many real-time data sources, such as custom data generators, IoT devices, and legacy systems, often send data over sockets. Building a socket-based IO connector in Python would allow Beam pipelines to process this data seamlessly without requiring users to implement custom socket reading logic outside the Beam ecosystem.

Primary Question(?):
Any advice on implementing an unbounded source would be appreciated. I have only recently begun to dig into Apache Beam.

Additional Context

Existing IO connectors in Beam are often geared towards standard services like Kafka, Pub/Sub, etc. Adding support for sockets will cater to users dealing with more specialized or ad-hoc data sources.

Current approach to read from socket


class ReadFromWebSocket(beam.DoFn):
    """
    A custom DoFn to read messages from a WebSocket stream.
    """
    def __init__(self, ws_url):
        """
        Initializes the WebSocket reader with the target URL.
        """
        self.ws_url = ws_url
        self.ws_connection = None

    def setup(self):
        """
        Set up the WebSocket connection.
        """
        self.ws_connection = create_connection(self.ws_url)
        print(f"Connected to WebSocket: {self.ws_url}")

    def process(self, element, *args, **kwargs):
        """
        Reads data from the WebSocket and outputs it as elements.
        """
        try:
            while True:
                # Read from WebSocket
                message = self.ws_connection.recv()
                message = json.loads(message)
                yield beam.window.TimestampedValue(message, datetime.datetime.now().timestamp())
                # Avoid busy-waiting
                time.sleep(0.01)
        except Exception as e:
            print(f"Error while reading WebSocket: {e}")

    def teardown(self):
        """
        Clean up the WebSocket connection.
        """
        if self.ws_connection:
            self.ws_connection.close()
            print(f"Closed WebSocket connection to {self.ws_url}")

Pipeline Example

  with beam.Pipeline(options=options) as pipeline:

          # Start with a dummy source (PBegin) that triggers the custom DoFn
          (

              pipeline
              | "CreateStart" >> beam.Create([None])  # Start with a single dummy element

              | "ReadFromWebSocket" >> beam.ParDo(ReadFromWebSocket(ws_url))

              | "WindowIntoFixed" >> beam.WindowInto(
                  GlobalWindows(),
                  trigger=trigger.Repeatedly(trigger.AfterCount(10)),
                  accumulation_mode=AccumulationMode.ACCUMULATING)
              # Extract and sum username/score pairs from the event data.
              | 'ExtractAndSumScore' >> ExtractAndSumScore('team')
              | "PrintMessages" >> beam.Map(print)  # Replace with actual processing logic
          )
class ExtractAndSumScore(beam.PTransform):
  """A transform to extract key/score information and sum the scores.
  The constructor argument `field` determines whether 'team' or 'user' info is
  extracted.
  """
  def __init__(self, field):
    # TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.
    # super().__init__()
    beam.PTransform.__init__(self)
    self.field = field

  def expand(self, pcoll):
    print(pcoll)
    return (
        pcoll
        | beam.Map(lambda elem: (elem[self.field], elem['score']))
        | beam.CombinePerKey(sum))

The current pipeline stalls when combined with a window and aggregation.

Issue Priority

Priority: 3 (nice-to-have improvement)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@CWrecker CWrecker changed the title [Feature Request]: [Feature Request]: Python-based Unbounded WebSocker IO Connector Nov 26, 2024
@CWrecker
Copy link
Author

.take-issue

Copy link
Contributor

Label p2 cannot be managed because it does not exist in the repo. Please check your spelling.

Copy link
Contributor

Label cannot be managed because it does not exist in the repo. Please check your spelling.

1 similar comment
Copy link
Contributor

Label cannot be managed because it does not exist in the repo. Please check your spelling.

@CWrecker
Copy link
Author

.set-labels P2,python,io,'new feature'

@damondouglas
Copy link
Contributor

cc: @damondouglas

@CWrecker
Copy link
Author

Down stream transforms, windows, and sinks don't appear to run until the I terminate the socket connection and the pipeline is drained. Is there a reason for this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants