Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Cannot use python ReadFromKafka via DirectRunner in CI #31167

Closed
1 of 16 tasks
portikCoder opened this issue May 3, 2024 · 1 comment
Closed
1 of 16 tasks

[Bug]: Cannot use python ReadFromKafka via DirectRunner in CI #31167

portikCoder opened this issue May 3, 2024 · 1 comment

Comments

@portikCoder
Copy link

portikCoder commented May 3, 2024

What happened?

Hi

I'm hoping there is somebody out there already figured this out if it's possible and I'm just not capable enough to setup the environment correctly in order to CI test my whole pipeline.

Context: using Redpanda as a Kafka replacement trying to create integration tests without the need to involve any 3pp runner, so via DirectRunner. I've already successfully managed to use Dataflow runner e.g. from within the CI env, that's no issue, tho as u know ReadFromKafka has to be using 2 external dependencies from the Java SDK in order to function.
One being the beam-sdks-java-io-expansion-service-2.5X.0.jar, while the other is apache/beam_java21_sdk:2.5X.0 docker container.

On my local machine I'm fully capable of setting this up (M1 Mac) so it's working & succeeding, while on CI I've run into multiple issues.
Most of them I was able to resolve by manually downloading the SDK.jar file and spin that up, moreover specifying it for ReadFromKafka(...expansion_service="localhost:8097") and based on logs the python ABeam template using DirectRunner was able to connect to it, moreover I manually pulled the docker image, which then it was able to spin up. Check logs from within the CI (Gitlab) worker:

$ netstat -an | grep 8097 || echo "Port 8097 is available"
Port 8097 is available
$ java -jar ${EXPANSION_FILE_NAME} 8097 &
Starting expansion service at localhost:8097
...

docker pull docker.io/apache/beam_java21_sdk:2.52.0
...
Status: Image is up to date for apache/beam_java21_sdk:2.52.0
...

$ netstat -an | grep 8097
tcp        0      0 0.0.0.0:8097            0.0.0.0:*               LISTEN     
$ python -m beam_streamer local --topic-name mytopicname
...
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:starting control server on port 45871
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:starting data server on port 33019
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:starting state server on port 36783
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:starting logging server on port 41913
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.DockerSdkWorkerHandler object at 0x7db95812c950> for environment external_1beam:env:docker:v1 (beam:env:docker:v1, b'\n\x1dapache/beam_java17_sdk:2.52.0')
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Attempting to pull image apache/beam_java17_sdk:2.52.0
2.52.0: Pulling from apache/beam_java17_sdk
Digest: sha256:93d5cc70211b2e6d73ce3dda909599401e6c7c6a1ebf28fd8479426509237f25
Status: Image is up to date for apache/beam_java17_sdk:2.52.0
docker.io/apache/beam_java17_sdk:2.52.0
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Waiting for docker to start up. Current status is running
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Docker container is running. container_id = b'e600645ec4041e08d42fc3d59e85ddd00abac0207ebee078bf9528b9658b3e79', worker_id = worker_0
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7db95812dc10> for environment ref_Environment_default_environment_2 (beam:env:embedded_python:v1, b'')
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'2024/04/25 21:37:27 Failed to obtain provisioning information: failed to dial server at localhost:45871\n\tcaused by:\ncontext deadline exceeded\n'

I have a feeling the docker is using wrong host but looking into the apache_beam.runners.portability.fn_api_runner.worker_handlers.DockerSdkWorkerHandler it's impossible to set my own host name for docker, so I have no chance to change this which might answer why the context deadline.
I've tried setting kafka address to the following 3 styles:

KAFKA_ADDRS = "host.docker.internal:19092"
KAFKA_ADDRS = "localhost:19092"
KAFKA_ADDRS = "127.0.0.1:19092"

with effectively reaching to the same end result.
I've obv. tried looking into the genuine test-suite where I cannot find my answers either.

Did anybody // experience the same // correctly setup this locally?

Some tech. details:
ABeam version: 2.52 (I had issues with 2.55 where it didn't download the docker image and wasn't able to consume locally)
CI: Gitlab Premium

  • I've also tried using - name: docker:19-dind service together thinking it might be necessary for Docker inside Docker but ended up not using it bc I had the same effect as above...
    Local: M1 Mac

Sample .py file:

    kafka_config = {
        "bootstrap.servers": bootstrap_servers,
        "group.id": groupid,
        "auto.offset.reset": "earliest",
    }

    with Pipeline(options=pipeline_options) as p:
        kafka_records = p | ReadFromKafka(
            consumer_config=kafka_config,
            topics=[topic_name],
            max_read_time=10,
            expansion_service="localhost:8097"
        )

And the gitlab.ci:

beam streamer integration tests:
  stage: test
  extends:
    - .install_beam_streamer_deps
  image: python:3.11.8
  variables:
    KAFKA_ADDR: 127.0.0.1:19092
    KAFKA_TOPIC: mytopicname
    EXPANSION_FILE_NAME: beam-sdks-java-io-expansion-service-2.52.0.jar

#    DOCKER_HOST: docker0
#    DOCKER_HOST: tcp://localhost:2375
#    DOCKER_DRIVER: overlay2
#    DOCKER_TLS_CERTDIR: ""
  services:
    - name: docker.redpanda.com/redpandadata/redpanda:v23.1.13
      command:
        - redpanda
        - start
        - --smp
        - '1'
        - --memory
        - 256M
        - --reserve-memory
        - 0M
        - --overprovisioned
        - --node-id
        - '0'
        - --kafka-addr
        - PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:19092
        - --advertise-kafka-addr
        - PLAINTEXT://redpanda:29092,OUTSIDE://localhost:19092
        - --pandaproxy-addr
        - PLAINTEXT://0.0.0.0:28082,OUTSIDE://0.0.0.0:8082
        - --advertise-pandaproxy-addr
        - PLAINTEXT://redpanda:28082,OUTSIDE://localhost:8082
#    - name: docker:19-dind  # It's for `apache/beam_java21_sdk:2.52.0` that will be spun up by ABeam automatically.
#      command: ["--tls=false"]
  script:
    - curl -LO https://github.com/redpanda-data/redpanda/releases/latest/download/rpk-linux-amd64.zip
    - mkdir -p ~/.local/bin
    - export PATH="~/.local/bin:$PATH"
    - unzip rpk-linux-amd64.zip -d ~/.local/bin/
    - rpk version
    - rpk --brokers ${KAFKA_ADDR} topic create ${KAFKA_TOPIC}
    - echo '{"a":1, "b":2}\n{"b":1, "c":5}' | rpk --brokers ${KAFKA_ADDR} topic produce ${KAFKA_TOPIC}
    - apt-get update && apt-get install -y openjdk-17-jdk
    - java -version
    - apt-get update && apt-get install -y docker.io net-tools
    - docker --version
    - docker pull docker.io/apache/beam_java21_sdk:2.52.0
    - curl "https://repo.maven.apache.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.52.0/${EXPANSION_FILE_NAME}" --output ${EXPANSION_FILE_NAME}
    - netstat -an | grep 8097 || echo "Port 8097 is available"
    - java -jar ${EXPANSION_FILE_NAME} 8097 &
    - sleep 5 #&& curl -v http://localhost:8097/status
    - netstat -an | grep 8097
    - python -m beam_streamer local --topic-name E2E-operations

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@liferoad
Copy link
Contributor

liferoad commented May 4, 2024

I suggest you post this question to the Beam user list ([email protected]) or https://stackoverflow.com/questions/tagged/apache-beam. You might get more help over there.

From the log you post, I think the portable runner has the trouble to start the Python worker harness. https://beam.apache.org/documentation/runtime/sdk-harness-config/ mentions you could use LOOPBACK.

@liferoad liferoad closed this as completed May 4, 2024
@github-actions github-actions bot added this to the 2.57.0 Release milestone May 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants