Skip to content

PerformanceTests xlang KafkaIO Python #102

PerformanceTests xlang KafkaIO Python

PerformanceTests xlang KafkaIO Python #102

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: PerformanceTests xlang KafkaIO Python
on:
schedule:
- cron: '30 11 * * *'
workflow_dispatch:
# Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event
permissions:
actions: write
pull-requests: write
checks: write
contents: read
deployments: read
id-token: none
issues: write
discussions: read
packages: read
pages: read
repository-projects: read
security-events: read
statuses: read
# This allows a subsequently queued workflow run to interrupt previous runs
concurrency:
group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}'
cancel-in-progress: true
env:
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}
INFLUXDB_USER: ${{ secrets.INFLUXDB_USER }}
INFLUXDB_USER_PASSWORD: ${{ secrets.INFLUXDB_USER_PASSWORD }}
jobs:
beam_PerformanceTests_xlang_KafkaIO_Python:
if: |
github.event_name == 'workflow_dispatch' ||
(github.event_name == 'schedule' && github.repository == 'apache/beam') ||
github.event.comment.body == 'Run Python xlang KafkaIO Performance Test'
runs-on: [self-hosted, ubuntu-20.04, main]
timeout-minutes: 240
name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
strategy:
matrix:
job_name: ["beam_PerformanceTests_xlang_KafkaIO_Python"]
job_phrase: ["Run Python xlang KafkaIO Performance Test"]
steps:
- uses: actions/checkout@v4
- name: Setup repository
uses: ./.github/actions/setup-action
with:
comment_phrase: ${{ matrix.job_phrase }}
github_token: ${{ secrets.GITHUB_TOKEN }}
github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
- name: Setup environment
uses: ./.github/actions/setup-environment-action
with:
python-version: 3.8
- name: Authenticate on GCP
id: auth
uses: google-github-actions/auth@v1
with:
credentials_json: ${{ secrets.GCP_SA_KEY }}
project_id: ${{ secrets.GCP_PROJECT_ID }}
- name: Set k8s access
uses: ./.github/actions/setup-k8s-access
with:
cluster_name: io-datastores-for-tests
k8s_namespace: ${{ matrix.job_name }}-${{ github.run_id }}
- name: Install Kafka
id: install_kafka
run: |
cd ${{ github.workspace }}/.test-infra/kubernetes/kafka-cluster/
kubectl apply -R -f .
- name: Get Kafka IP
id: kafka_ip
run: |
kubectl wait svc/outside-0 --for=jsonpath='{.status.loadBalancer.ingress[0].ip}' --timeout=120s
kubectl wait svc/outside-1 --for=jsonpath='{.status.loadBalancer.ingress[0].ip}' --timeout=120s
kubectl wait svc/outside-2 --for=jsonpath='{.status.loadBalancer.ingress[0].ip}' --timeout=120s
KAFKA_BROKER_0_IP=$(kubectl get svc outside-0 -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
KAFKA_BROKER_1_IP=$(kubectl get svc outside-1 -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
KAFKA_BROKER_2_IP=$(kubectl get svc outside-2 -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
echo KAFKA_BROKER_0=$KAFKA_BROKER_0_IP >> $GITHUB_OUTPUT
echo KAFKA_BROKER_1=$KAFKA_BROKER_1_IP >> $GITHUB_OUTPUT
echo KAFKA_BROKER_2=$KAFKA_BROKER_2_IP >> $GITHUB_OUTPUT
- name: Prepare test arguments
uses: ./.github/actions/test-arguments-action
with:
test-type: load
test-language: python
argument-file-paths: |
${{ github.workspace }}/.github/workflows/performance-tests-pipeline-options/xlang_KafkaIO_Python.txt
arguments: |
--filename_prefix=gs://temp-storage-for-perf-tests/${{ matrix.job_name }}/${{github.run_id}}/
--bootstrap_servers=${{ steps.kafka_ip.outputs.KAFKA_BROKER_0 }}:32400,${{ steps.kafka_ip.outputs.KAFKA_BROKER_1 }}:32400,${{ steps.kafka_ip.outputs.KAFKA_BROKER_2 }}:32400
- name: run shadowJar
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :sdks:java:io:expansion-service:shadowJar
# The env variable is created and populated in the test-arguments-action as "<github.job>_test_arguments_<argument_file_paths_index>"
- name: run integrationTest
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :sdks:python:apache_beam:testing:load_tests:run
arguments: |
-Prunner=DataflowRunner \
-PloadTest.mainClass=apache_beam.io.external.xlang_kafkaio_perf_test \
-PpythonVersion=3.8 \
'-PloadTest.args=${{ env.beam_PerformanceTests_xlang_KafkaIO_Python_test_arguments_1 }}'