From de6d1c9a9d9bdd524590ed22ba40e1fff2cb2980 Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Tue, 24 Jan 2023 23:50:49 +0000 Subject: [PATCH 01/12] Refactor kafka-cluster outside-services --- .test-infra/jenkins/Kubernetes.groovy | 33 +++++++++++++++++++ ...rformanceTests_xlang_KafkaIO_Python.groovy | 7 ++-- .../04-outside-services/outside-0.yml | 2 +- .../04-outside-services/outside-1.yml | 2 +- .../04-outside-services/outside-2.yml | 2 +- .../05-kafka/10broker-config.yml | 10 ++---- .test-infra/kubernetes/kubernetes.sh | 16 +++++++++ 7 files changed, 58 insertions(+), 14 deletions(-) diff --git a/.test-infra/jenkins/Kubernetes.groovy b/.test-infra/jenkins/Kubernetes.groovy index 957c823cbd88..8fb33efcd31f 100644 --- a/.test-infra/jenkins/Kubernetes.groovy +++ b/.test-infra/jenkins/Kubernetes.groovy @@ -116,6 +116,39 @@ class Kubernetes { } } + /** + * Specifies steps that will save specified address node ip address + * as an environment variable that can be used in later steps if needed. + * + * @param referenceName - name of the environment variable + */ + void nodeIPAddress(String referenceName) { + jobs.steps { + String command = "${KUBERNETES_SCRIPT} nodeIPAddress" + shell("set -eo pipefail; eval ${command} | sed 's/^/${referenceName}=/' > job.properties") + environmentVariables { + propertiesFile('job.properties') + } + } + } + + /** + * Specifies steps that will save the NodePort of the specified service + * as an environment variable that can be used in later steps if needed. + * + * @param serviceName - name of the load balancer Kubernetes service + * @param referenceName - name of the environment variable + */ + void nodePort(String serviceName, String referenceName) { + job.steps { + String command = "${KUBERNETES_SCRIPT} nodePort ${serviceName}" + shell("set -eo pipefail; eval ${command} | sed 's/^/${referenceName}=/' > job.properties") + environmentVariables { + propertiesFile('job.properties') + } + } + } + /** * Specifies steps that will save specified load balancer serivce address * as an environment variable that can be used in later steps if needed. diff --git a/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy b/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy index 142921c277a4..8ed5456e692b 100644 --- a/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy +++ b/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy @@ -90,14 +90,14 @@ private void createKafkaIOTestJob(testJob) { } } k8s.apply(kafkaDir) - (0..2).each { k8s.loadBalancerIP("outside-$it", "KAFKA_BROKER_$it") } + k8s.nodeIPAddress("NODE_IP") + (0..2).each { k8s.nodePort("outside-$it", "NODE_PORT_$it") } k8s.waitForJob(kafkaTopicJob,"40m") additionalPipelineArgs = [ influx_db_name: InfluxDBCredentialsHelper.InfluxDBDatabaseName, influx_hostname: InfluxDBCredentialsHelper.InfluxDBHostUrl, - bootstrap_servers: "\$KAFKA_BROKER_0:\$KAFKA_SERVICE_PORT_0,\$KAFKA_BROKER_1:\$KAFKA_SERVICE_PORT_1," + - "\$KAFKA_BROKER_2:\$KAFKA_SERVICE_PORT_2", //KAFKA_BROKER_ represents IP and KAFKA_SERVICE_ port of outside services + bootstrap_servers: "\$NODE_IP:\$NODE_PORT_0,\$NODE_IP:\$NODE_PORT_1,\$NODE_IP:\$NODE_PORT_2", ] testJob.pipelineOptions.putAll(additionalPipelineArgs) @@ -106,6 +106,7 @@ private void createKafkaIOTestJob(testJob) { project : 'apache-beam-testing', region : 'us-central1', temp_location : 'gs://temp-storage-for-perf-tests/', + usePublicIPs : false, filename_prefix : "gs://temp-storage-for-perf-tests/${testJob.name}/\${BUILD_ID}/", sdk_harness_container_image_overrides: '.*java.*,gcr.io/apache-beam-testing/beam-sdk/beam_java8_sdk:latest' ] diff --git a/.test-infra/kubernetes/kafka-cluster/04-outside-services/outside-0.yml b/.test-infra/kubernetes/kafka-cluster/04-outside-services/outside-0.yml index e7513ec1b89a..653c5043b6dc 100644 --- a/.test-infra/kubernetes/kafka-cluster/04-outside-services/outside-0.yml +++ b/.test-infra/kubernetes/kafka-cluster/04-outside-services/outside-0.yml @@ -26,4 +26,4 @@ spec: targetPort: 9094 port: 32400 nodePort: 32400 - type: LoadBalancer + type: NodePort diff --git a/.test-infra/kubernetes/kafka-cluster/04-outside-services/outside-1.yml b/.test-infra/kubernetes/kafka-cluster/04-outside-services/outside-1.yml index 50e5fb0650bd..bc146d868159 100644 --- a/.test-infra/kubernetes/kafka-cluster/04-outside-services/outside-1.yml +++ b/.test-infra/kubernetes/kafka-cluster/04-outside-services/outside-1.yml @@ -26,4 +26,4 @@ spec: targetPort: 9094 port: 32401 nodePort: 32401 - type: LoadBalancer + type: NodePort diff --git a/.test-infra/kubernetes/kafka-cluster/04-outside-services/outside-2.yml b/.test-infra/kubernetes/kafka-cluster/04-outside-services/outside-2.yml index 87c324b8eea9..886246c40198 100644 --- a/.test-infra/kubernetes/kafka-cluster/04-outside-services/outside-2.yml +++ b/.test-infra/kubernetes/kafka-cluster/04-outside-services/outside-2.yml @@ -26,4 +26,4 @@ spec: targetPort: 9094 port: 32402 nodePort: 32402 - type: LoadBalancer + type: NodePort diff --git a/.test-infra/kubernetes/kafka-cluster/05-kafka/10broker-config.yml b/.test-infra/kubernetes/kafka-cluster/05-kafka/10broker-config.yml index c465e026e48c..b662ae7b253b 100644 --- a/.test-infra/kubernetes/kafka-cluster/05-kafka/10broker-config.yml +++ b/.test-infra/kubernetes/kafka-cluster/05-kafka/10broker-config.yml @@ -38,14 +38,8 @@ data: SEDS+=("s/#init#broker.rack=#init#/broker.rack=$ZONE/") LABELS="$LABELS kafka-broker-rack=$ZONE" fi - OUTSIDE_HOST="" - while [ -z $OUTSIDE_HOST ]; do - echo "Waiting for end point..." - OUTSIDE_HOST=$(kubectl get svc outside-${KAFKA_BROKER_ID} --template="{{range .status.loadBalancer.ingress}}{{.ip}}{{end}}") - [ -z "$OUTSIDE_HOST" ] && sleep 10 - done - # OUTSIDE_HOST=$(kubectl get node "$NODE_NAME" -o jsonpath='{.status.addresses[?(@.type=="InternalIP")].address}') - OUTSIDE_PORT=$(kubectl get svc outside-${KAFKA_BROKER_ID} --template="{{range .spec.ports}}{{.port}}{{end}}") + OUTSIDE_HOST=$(kubectl get node "$NODE_NAME" -o jsonpath='{.status.addresses[?(@.type=="InternalIP")].address}') + OUTSIDE_PORT=$(kubectl get svc outside-${KAFKA_BROKER_ID} -o jsonpath='{.spec.ports[0].nodePort}') SEDS+=("s|#init#advertised.listeners=OUTSIDE://#init#|advertised.listeners=OUTSIDE://${OUTSIDE_HOST}:${OUTSIDE_PORT}|") ANNOTATIONS="$ANNOTATIONS kafka-listener-outside-host=$OUTSIDE_HOST kafka-listener-outside-port=$OUTSIDE_PORT" diff --git a/.test-infra/kubernetes/kubernetes.sh b/.test-infra/kubernetes/kubernetes.sh index 1ec3cd9364c7..07f639e8f898 100755 --- a/.test-infra/kubernetes/kubernetes.sh +++ b/.test-infra/kubernetes/kubernetes.sh @@ -80,6 +80,22 @@ function deleteNamespace() { eval "kubectl --kubeconfig=${KUBECONFIG} delete namespace $1" } +# Gets Node IP address of cluster +# Usage: ./kubernetes.sh nodeIPAddress +function nodeIPAddress() { + local command="$KUBECTL get node -ojsonpath='{.items[*].status.addresses[?(@.type==\"InternalIP\")].address}'" +} + +# Gets NodePort of service +# Blocks and retries until the IP is present or retry limit is exceeded. +# +# Usage: ./kubernetes.sh nodePort +function nodePort() { + local name=$1 + local command="$KUBECTL get svc $name -ojsonpath='{.status.ports[0].nodePort}'" + retry "${command}" 36 10 +} + # Gets Load Balancer Ingress IP address. # Blocks and retries until the IP is present or retry limit is exceeded. # From 09af9aae7f58302a2613fcfe5a4c1e1864035fd8 Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Wed, 25 Jan 2023 00:24:36 +0000 Subject: [PATCH 02/12] Patch misspelling --- .test-infra/jenkins/Kubernetes.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.test-infra/jenkins/Kubernetes.groovy b/.test-infra/jenkins/Kubernetes.groovy index 8fb33efcd31f..580ebcf7fe43 100644 --- a/.test-infra/jenkins/Kubernetes.groovy +++ b/.test-infra/jenkins/Kubernetes.groovy @@ -123,7 +123,7 @@ class Kubernetes { * @param referenceName - name of the environment variable */ void nodeIPAddress(String referenceName) { - jobs.steps { + job.steps { String command = "${KUBERNETES_SCRIPT} nodeIPAddress" shell("set -eo pipefail; eval ${command} | sed 's/^/${referenceName}=/' > job.properties") environmentVariables { From 85e901a541dc152ad712bebd7b6d879fd87e5f4c Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Wed, 25 Jan 2023 18:49:22 +0000 Subject: [PATCH 03/12] Add IaC gcp/networking --- .test-infra/terraform/OWNERS | 4 ++ .test-infra/terraform/README.md | 30 +++++++++++++ .../terraform/google-cloud-platform/README.md | 44 +++++++++++++++++++ .../networking/backend.tf | 29 ++++++++++++ .../google-cloud-platform/networking/data.tf | 40 +++++++++++++++++ .../google-cloud-platform/networking/nat.tf | 40 +++++++++++++++++ .../networking/provider.tf | 22 ++++++++++ .../networking/variables.tf | 38 ++++++++++++++++ 8 files changed, 247 insertions(+) create mode 100644 .test-infra/terraform/OWNERS create mode 100644 .test-infra/terraform/README.md create mode 100644 .test-infra/terraform/google-cloud-platform/README.md create mode 100644 .test-infra/terraform/google-cloud-platform/networking/backend.tf create mode 100644 .test-infra/terraform/google-cloud-platform/networking/data.tf create mode 100644 .test-infra/terraform/google-cloud-platform/networking/nat.tf create mode 100644 .test-infra/terraform/google-cloud-platform/networking/provider.tf create mode 100644 .test-infra/terraform/google-cloud-platform/networking/variables.tf diff --git a/.test-infra/terraform/OWNERS b/.test-infra/terraform/OWNERS new file mode 100644 index 000000000000..d4ebbd97fbb5 --- /dev/null +++ b/.test-infra/terraform/OWNERS @@ -0,0 +1,4 @@ +# See the OWNERS docs at https://s.apache.org/beam-owners + +reviewers: + - damondouglas diff --git a/.test-infra/terraform/README.md b/.test-infra/terraform/README.md new file mode 100644 index 000000000000..60e4d6f984af --- /dev/null +++ b/.test-infra/terraform/README.md @@ -0,0 +1,30 @@ + + +# Overview + +This folder contains Infrastructure-as-Code implemented using +[terraform](https://terraform.io) to provision resources for Apache Beam tests. +Subfolders organize code according to the target environement. + +# Requirements + +Usage of this code minimally requires the terraform command-line interface. +See [terraform](https://terraform.io) for further details. See individual +subfolders for additional requirements for the target environment. \ No newline at end of file diff --git a/.test-infra/terraform/google-cloud-platform/README.md b/.test-infra/terraform/google-cloud-platform/README.md new file mode 100644 index 000000000000..a7a7861b28b0 --- /dev/null +++ b/.test-infra/terraform/google-cloud-platform/README.md @@ -0,0 +1,44 @@ + + +# Overview + +This folder contains Infrastructure-as-Code implemented using +[terraform](https://terraform.io) to provision resources for Apache Beam tests +requiring Google Cloud resources. + +# Requirements + +Usage of this code requires: + +- [terraform cli v1.2.0 and later](https://terraform.io) +- [Google Cloud SDK](https://cloud.google.com/sdk); `gcloud init` + and `gcloud auth` +- Google Cloud project with billing enabled +- IntelliJ or VS Code terraform plugin (optional but **highly** recommended) + +# Usage + +Each folder contains all the code to achieve a category of +provisioning. For example, the [networking](networking) folder contains +all the necessary code to provision Google Cloud related networking resources. + +To provision resources in a chosen folder follow the conventional terraform +workflow. See https://developer.hashicorp.com/terraform/intro/core-workflow +for details. diff --git a/.test-infra/terraform/google-cloud-platform/networking/backend.tf b/.test-infra/terraform/google-cloud-platform/networking/backend.tf new file mode 100644 index 000000000000..0816ebcc915f --- /dev/null +++ b/.test-infra/terraform/google-cloud-platform/networking/backend.tf @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +// Tells terraform to store state files in Google Cloud storage. +// The Google Cloud storage bucket name must be hard coded. +// See https://developer.hashicorp.com/terraform/language/settings/backends/configuration +// for details. +terraform { + backend "gcs" { + bucket = "b507e468-52e9-4e72-83e5-ecbf563eda12" + prefix = "terraform/projects/apache-beam-testing/networking" + } +} \ No newline at end of file diff --git a/.test-infra/terraform/google-cloud-platform/networking/data.tf b/.test-infra/terraform/google-cloud-platform/networking/data.tf new file mode 100644 index 000000000000..c36395fc2897 --- /dev/null +++ b/.test-infra/terraform/google-cloud-platform/networking/data.tf @@ -0,0 +1,40 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +// Query the Virtual Private Cloud network +data "google_compute_network" "default" { + name = var.network +} + +// Query the Virtual Private Cloud subnetwork +// Additionally checks whether Private Google Access is true +data "google_compute_subnetwork" "default" { + name = var.subnetwork + region = var.region + lifecycle { + postcondition { + condition = self.private_ip_google_access + error_message = < Date: Wed, 25 Jan 2023 18:56:31 +0000 Subject: [PATCH 04/12] Patch README --- .../networking/README.md | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 .test-infra/terraform/google-cloud-platform/networking/README.md diff --git a/.test-infra/terraform/google-cloud-platform/networking/README.md b/.test-infra/terraform/google-cloud-platform/networking/README.md new file mode 100644 index 000000000000..07fc26226e10 --- /dev/null +++ b/.test-infra/terraform/google-cloud-platform/networking/README.md @@ -0,0 +1,47 @@ + + +# Overview + +This folder contains Infrastructure-as-Code implemented using +[terraform](https://terraform.io) to provision resources +for Apache Beam tests requiring networking specific Google Cloud resources. + +Code in the folder: +- Validates the Google Cloud Virtual Private Cloud (VPC) subnetwork has +has [private Google Access](https://cloud.google.com/vpc/docs/private-google-access) +turned on +- Provisions a [Cloud NAT](https://cloud.google.com/nat/docs/overview) and +[Cloud Router](https://cloud.google.com/network-connectivity/docs/router/concepts/overview) + +# Requirements + +Usage of this code requires: + +- [terraform cli v1.2.0 and later](https://terraform.io) +- [Google Cloud SDK](https://cloud.google.com/sdk); `gcloud init` + and `gcloud auth` +- Google Cloud project with billing enabled +- IntelliJ or VS Code terraform plugin (optional but **highly** recommended) + +# Usage + +To provision resources in a this folder follow the conventional terraform +workflow. See https://developer.hashicorp.com/terraform/intro/core-workflow +for details. From cfda0e016ba8f3b401384f374d9718828786c6ea Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Wed, 25 Jan 2023 19:10:43 +0000 Subject: [PATCH 05/12] Patch usePublicIps=false documentation --- .../jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy | 2 +- .../terraform/google-cloud-platform/networking/README.md | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy b/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy index 8ed5456e692b..6df0580ae80c 100644 --- a/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy +++ b/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy @@ -106,7 +106,7 @@ private void createKafkaIOTestJob(testJob) { project : 'apache-beam-testing', region : 'us-central1', temp_location : 'gs://temp-storage-for-perf-tests/', - usePublicIPs : false, + usePublicIPs : false, // See .test-infra/terraform/google-cloud-platform/networking filename_prefix : "gs://temp-storage-for-perf-tests/${testJob.name}/\${BUILD_ID}/", sdk_harness_container_image_overrides: '.*java.*,gcr.io/apache-beam-testing/beam-sdk/beam_java8_sdk:latest' ] diff --git a/.test-infra/terraform/google-cloud-platform/networking/README.md b/.test-infra/terraform/google-cloud-platform/networking/README.md index 07fc26226e10..753119f7082d 100644 --- a/.test-infra/terraform/google-cloud-platform/networking/README.md +++ b/.test-infra/terraform/google-cloud-platform/networking/README.md @@ -23,7 +23,8 @@ This folder contains Infrastructure-as-Code implemented using [terraform](https://terraform.io) to provision resources for Apache Beam tests requiring networking specific Google Cloud resources. -Code in the folder: +To support the [`usePublicIps=false`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.html#setUsePublicIps-java.lang.Boolean-) +flag for Dataflow jobs, code in the folder: - Validates the Google Cloud Virtual Private Cloud (VPC) subnetwork has has [private Google Access](https://cloud.google.com/vpc/docs/private-google-access) turned on From eab53a9c55b691df7a0960df246c980e927bbcb1 Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Wed, 25 Jan 2023 20:07:56 +0000 Subject: [PATCH 06/12] Refactor KafkaIO to use NodePort --- .test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy b/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy index d513dd96a7e2..fa89f4a295d2 100644 --- a/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy +++ b/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy @@ -65,13 +65,15 @@ job(jobName) { } } k8s.apply(kafkaDir) - (0..2).each { k8s.loadBalancerIP("outside-$it", "KAFKA_BROKER_$it") } + k8s.nodeIPAddress("NODE_IP") + (0..2).each { k8s.nodePort("outside-$it", "NODE_PORT_$it") } k8s.waitForJob(kafkaTopicJob,"40m") Map pipelineOptions = [ tempRoot : 'gs://temp-storage-for-perf-tests', project : 'apache-beam-testing', runner : 'DataflowRunner', + usePublicIPs : false, // See .test-infra/terraform/google-cloud-platform/networking sourceOptions : """ { "numRecords": "100000000", @@ -84,8 +86,7 @@ job(jobName) { influxMeasurement : 'kafkaioit_results', influxDatabase : InfluxDBCredentialsHelper.InfluxDBDatabaseName, influxHost : InfluxDBCredentialsHelper.InfluxDBHostUrl, - kafkaBootstrapServerAddresses: "\$KAFKA_BROKER_0:\$KAFKA_SERVICE_PORT_0,\$KAFKA_BROKER_1:\$KAFKA_SERVICE_PORT_1," + - "\$KAFKA_BROKER_2:\$KAFKA_SERVICE_PORT_2", //KAFKA_BROKER_ represents IP and KAFKA_SERVICE_ port of outside services + kafkaBootstrapServerAddresses: "\$NODE_IP:\$NODE_PORT_0,\$NODE_IP:\$NODE_PORT_1,\$NODE_IP:\$NODE_PORT_2", kafkaTopic : 'beam-batch', readTimeout : '1800', numWorkers : '5', From cfe7e263f6cb4042fde7eea651fb511c3f7c2926 Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Wed, 25 Jan 2023 23:07:10 +0000 Subject: [PATCH 07/12] Patch nodeport jsonpath --- .test-infra/jenkins/Kubernetes.groovy | 5 +++-- .../jenkins/job_PerformanceTests_KafkaIO_IT.groovy | 8 +++++--- .../job_PerformanceTests_xlang_KafkaIO_Python.groovy | 8 +++++--- .test-infra/kubernetes/kubernetes.sh | 7 ++++--- 4 files changed, 17 insertions(+), 11 deletions(-) diff --git a/.test-infra/jenkins/Kubernetes.groovy b/.test-infra/jenkins/Kubernetes.groovy index 580ebcf7fe43..eb4a98290203 100644 --- a/.test-infra/jenkins/Kubernetes.groovy +++ b/.test-infra/jenkins/Kubernetes.groovy @@ -120,11 +120,12 @@ class Kubernetes { * Specifies steps that will save specified address node ip address * as an environment variable that can be used in later steps if needed. * + * @param nodeIndex - index of node * @param referenceName - name of the environment variable */ - void nodeIPAddress(String referenceName) { + void nodeIPAddress(int nodeIndex, String referenceName) { job.steps { - String command = "${KUBERNETES_SCRIPT} nodeIPAddress" + String command = "${KUBERNETES_SCRIPT} nodeIPAddress ${nodeIndex}" shell("set -eo pipefail; eval ${command} | sed 's/^/${referenceName}=/' > job.properties") environmentVariables { propertiesFile('job.properties') diff --git a/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy b/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy index fa89f4a295d2..fbf986deda91 100644 --- a/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy +++ b/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy @@ -65,8 +65,10 @@ job(jobName) { } } k8s.apply(kafkaDir) - k8s.nodeIPAddress("NODE_IP") - (0..2).each { k8s.nodePort("outside-$it", "NODE_PORT_$it") } + (0..2).each { + k8s.nodeIPAddress(it, "NODE_IP_$it") + k8s.nodePort("outside-$it", "NODE_PORT_$it") + } k8s.waitForJob(kafkaTopicJob,"40m") Map pipelineOptions = [ @@ -86,7 +88,7 @@ job(jobName) { influxMeasurement : 'kafkaioit_results', influxDatabase : InfluxDBCredentialsHelper.InfluxDBDatabaseName, influxHost : InfluxDBCredentialsHelper.InfluxDBHostUrl, - kafkaBootstrapServerAddresses: "\$NODE_IP:\$NODE_PORT_0,\$NODE_IP:\$NODE_PORT_1,\$NODE_IP:\$NODE_PORT_2", + kafkaBootstrapServerAddresses: "\$NODE_IP_0:\$NODE_PORT_0,\$NODE_IP_1:\$NODE_PORT_1,\$NODE_IP_2:\$NODE_PORT_2", kafkaTopic : 'beam-batch', readTimeout : '1800', numWorkers : '5', diff --git a/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy b/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy index 6df0580ae80c..d005c60401b8 100644 --- a/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy +++ b/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy @@ -90,14 +90,16 @@ private void createKafkaIOTestJob(testJob) { } } k8s.apply(kafkaDir) - k8s.nodeIPAddress("NODE_IP") - (0..2).each { k8s.nodePort("outside-$it", "NODE_PORT_$it") } + (0..2).each { + k8s.nodeIPAddress(it, "NODE_IP_$it") + k8s.nodePort("outside-$it", "NODE_PORT_$it") + } k8s.waitForJob(kafkaTopicJob,"40m") additionalPipelineArgs = [ influx_db_name: InfluxDBCredentialsHelper.InfluxDBDatabaseName, influx_hostname: InfluxDBCredentialsHelper.InfluxDBHostUrl, - bootstrap_servers: "\$NODE_IP:\$NODE_PORT_0,\$NODE_IP:\$NODE_PORT_1,\$NODE_IP:\$NODE_PORT_2", + bootstrap_servers: "\$NODE_IP_0:\$NODE_PORT_0,\$NODE_IP_1:\$NODE_PORT_1,\$NODE_IP_2:\$NODE_PORT_2", ] testJob.pipelineOptions.putAll(additionalPipelineArgs) diff --git a/.test-infra/kubernetes/kubernetes.sh b/.test-infra/kubernetes/kubernetes.sh index 07f639e8f898..636c4c4026da 100755 --- a/.test-infra/kubernetes/kubernetes.sh +++ b/.test-infra/kubernetes/kubernetes.sh @@ -81,9 +81,10 @@ function deleteNamespace() { } # Gets Node IP address of cluster -# Usage: ./kubernetes.sh nodeIPAddress +# Usage: ./kubernetes.sh nodeIPAddress nodeIndex function nodeIPAddress() { - local command="$KUBECTL get node -ojsonpath='{.items[*].status.addresses[?(@.type==\"InternalIP\")].address}'" + local nodeIndex=$1 + local command="$KUBECTL get node -ojsonpath='{.items[${nodeIndex}].status.addresses[?(@.type==\"InternalIP\")].address}'" } # Gets NodePort of service @@ -92,7 +93,7 @@ function nodeIPAddress() { # Usage: ./kubernetes.sh nodePort function nodePort() { local name=$1 - local command="$KUBECTL get svc $name -ojsonpath='{.status.ports[0].nodePort}'" + local command="$KUBECTL get svc $name -ojsonpath='{.spec.ports[0].nodePort}'" retry "${command}" 36 10 } From a97706f830e43d99ddfd381edb5327fa8a3ec920 Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Thu, 26 Jan 2023 17:12:44 +0000 Subject: [PATCH 08/12] Fix no signature of method: Boolean.replaceAll() --- .test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy | 2 +- .../jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy b/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy index fbf986deda91..b04d851247f7 100644 --- a/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy +++ b/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy @@ -75,7 +75,7 @@ job(jobName) { tempRoot : 'gs://temp-storage-for-perf-tests', project : 'apache-beam-testing', runner : 'DataflowRunner', - usePublicIPs : false, // See .test-infra/terraform/google-cloud-platform/networking + usePublicIPs : 'false', // See .test-infra/terraform/google-cloud-platform/networking sourceOptions : """ { "numRecords": "100000000", diff --git a/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy b/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy index d005c60401b8..f8697dedeccf 100644 --- a/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy +++ b/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy @@ -108,7 +108,7 @@ private void createKafkaIOTestJob(testJob) { project : 'apache-beam-testing', region : 'us-central1', temp_location : 'gs://temp-storage-for-perf-tests/', - usePublicIPs : false, // See .test-infra/terraform/google-cloud-platform/networking + usePublicIPs : 'false', // See .test-infra/terraform/google-cloud-platform/networking filename_prefix : "gs://temp-storage-for-perf-tests/${testJob.name}/\${BUILD_ID}/", sdk_harness_container_image_overrides: '.*java.*,gcr.io/apache-beam-testing/beam-sdk/beam_java8_sdk:latest' ] From efe996fb18d1895c42489d8f9b522556d40164d4 Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Thu, 26 Jan 2023 19:43:00 +0000 Subject: [PATCH 09/12] P vs p --- .test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy | 2 +- .../jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy b/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy index b04d851247f7..b2ec524ec091 100644 --- a/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy +++ b/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy @@ -75,7 +75,7 @@ job(jobName) { tempRoot : 'gs://temp-storage-for-perf-tests', project : 'apache-beam-testing', runner : 'DataflowRunner', - usePublicIPs : 'false', // See .test-infra/terraform/google-cloud-platform/networking + usePublicIps : 'false', // See .test-infra/terraform/google-cloud-platform/networking sourceOptions : """ { "numRecords": "100000000", diff --git a/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy b/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy index f8697dedeccf..fe4e58a8362b 100644 --- a/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy +++ b/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy @@ -108,7 +108,7 @@ private void createKafkaIOTestJob(testJob) { project : 'apache-beam-testing', region : 'us-central1', temp_location : 'gs://temp-storage-for-perf-tests/', - usePublicIPs : 'false', // See .test-infra/terraform/google-cloud-platform/networking + usePublicIps : 'false', // See .test-infra/terraform/google-cloud-platform/networking filename_prefix : "gs://temp-storage-for-perf-tests/${testJob.name}/\${BUILD_ID}/", sdk_harness_container_image_overrides: '.*java.*,gcr.io/apache-beam-testing/beam-sdk/beam_java8_sdk:latest' ] From 495b89c569af92a15ac8ff56fd7faf14f6b99c0b Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Thu, 26 Jan 2023 21:12:03 +0000 Subject: [PATCH 10/12] change NODE_IP to KAFKA_BROKER_ --- .test-infra/jenkins/Kubernetes.groovy | 2 +- .test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy | 4 ++-- .../jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/.test-infra/jenkins/Kubernetes.groovy b/.test-infra/jenkins/Kubernetes.groovy index eb4a98290203..eaf025c4e182 100644 --- a/.test-infra/jenkins/Kubernetes.groovy +++ b/.test-infra/jenkins/Kubernetes.groovy @@ -117,7 +117,7 @@ class Kubernetes { } /** - * Specifies steps that will save specified address node ip address + * Specifies steps that will save specified node ip address * as an environment variable that can be used in later steps if needed. * * @param nodeIndex - index of node diff --git a/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy b/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy index b2ec524ec091..aafe5257cecb 100644 --- a/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy +++ b/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy @@ -66,7 +66,7 @@ job(jobName) { } k8s.apply(kafkaDir) (0..2).each { - k8s.nodeIPAddress(it, "NODE_IP_$it") + k8s.nodeIPAddress(it, "KAFKA_BROKER_$it") k8s.nodePort("outside-$it", "NODE_PORT_$it") } k8s.waitForJob(kafkaTopicJob,"40m") @@ -88,7 +88,7 @@ job(jobName) { influxMeasurement : 'kafkaioit_results', influxDatabase : InfluxDBCredentialsHelper.InfluxDBDatabaseName, influxHost : InfluxDBCredentialsHelper.InfluxDBHostUrl, - kafkaBootstrapServerAddresses: "\$NODE_IP_0:\$NODE_PORT_0,\$NODE_IP_1:\$NODE_PORT_1,\$NODE_IP_2:\$NODE_PORT_2", + kafkaBootstrapServerAddresses: "\$KAFKA_BROKER_0:\$NODE_PORT_0,\$KAFKA_BROKER_1:\$NODE_PORT_1,\$KAFKA_BROKER_2:\$NODE_PORT_2", kafkaTopic : 'beam-batch', readTimeout : '1800', numWorkers : '5', diff --git a/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy b/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy index fe4e58a8362b..00f3dbd8c817 100644 --- a/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy +++ b/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy @@ -91,7 +91,7 @@ private void createKafkaIOTestJob(testJob) { } k8s.apply(kafkaDir) (0..2).each { - k8s.nodeIPAddress(it, "NODE_IP_$it") + k8s.nodeIPAddress(it, "KAFKA_BROKER_$it") k8s.nodePort("outside-$it", "NODE_PORT_$it") } k8s.waitForJob(kafkaTopicJob,"40m") @@ -99,7 +99,7 @@ private void createKafkaIOTestJob(testJob) { additionalPipelineArgs = [ influx_db_name: InfluxDBCredentialsHelper.InfluxDBDatabaseName, influx_hostname: InfluxDBCredentialsHelper.InfluxDBHostUrl, - bootstrap_servers: "\$NODE_IP_0:\$NODE_PORT_0,\$NODE_IP_1:\$NODE_PORT_1,\$NODE_IP_2:\$NODE_PORT_2", + bootstrap_servers: "\$KAFKA_BROKER_0:\$NODE_PORT_0,\$KAFKA_BROKER_1:\$NODE_PORT_1,\$KAFKA_BROKER_2:\$NODE_PORT_2", ] testJob.pipelineOptions.putAll(additionalPipelineArgs) From 30e2291699614d55bb3254edda5f0a494acc3791 Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Tue, 31 Jan 2023 15:55:38 +0000 Subject: [PATCH 11/12] Only query internal IP --- .test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy | 1 - .../jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy | 1 - 2 files changed, 2 deletions(-) diff --git a/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy b/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy index aafe5257cecb..38e1f50af208 100644 --- a/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy +++ b/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy @@ -67,7 +67,6 @@ job(jobName) { k8s.apply(kafkaDir) (0..2).each { k8s.nodeIPAddress(it, "KAFKA_BROKER_$it") - k8s.nodePort("outside-$it", "NODE_PORT_$it") } k8s.waitForJob(kafkaTopicJob,"40m") diff --git a/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy b/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy index 00f3dbd8c817..518b5f0a15db 100644 --- a/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy +++ b/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy @@ -92,7 +92,6 @@ private void createKafkaIOTestJob(testJob) { k8s.apply(kafkaDir) (0..2).each { k8s.nodeIPAddress(it, "KAFKA_BROKER_$it") - k8s.nodePort("outside-$it", "NODE_PORT_$it") } k8s.waitForJob(kafkaTopicJob,"40m") From c4a00eb41741429b25a891bee20facc994a05403 Mon Sep 17 00:00:00 2001 From: Damon Douglas Date: Tue, 31 Jan 2023 15:57:43 +0000 Subject: [PATCH 12/12] Change NODE_PORT to KAFKA_SERVICE_PORT --- .test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy | 2 +- .../jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy b/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy index 38e1f50af208..c9009445cea7 100644 --- a/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy +++ b/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy @@ -87,7 +87,7 @@ job(jobName) { influxMeasurement : 'kafkaioit_results', influxDatabase : InfluxDBCredentialsHelper.InfluxDBDatabaseName, influxHost : InfluxDBCredentialsHelper.InfluxDBHostUrl, - kafkaBootstrapServerAddresses: "\$KAFKA_BROKER_0:\$NODE_PORT_0,\$KAFKA_BROKER_1:\$NODE_PORT_1,\$KAFKA_BROKER_2:\$NODE_PORT_2", + kafkaBootstrapServerAddresses: "\$KAFKA_BROKER_0:\$KAFKA_SERVICE_PORT_0,\$KAFKA_BROKER_1:\$KAFKA_SERVICE_PORT_1,\$KAFKA_BROKER_2:\$KAFKA_SERVICE_PORT_2", kafkaTopic : 'beam-batch', readTimeout : '1800', numWorkers : '5', diff --git a/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy b/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy index 518b5f0a15db..7cd3bf3f0140 100644 --- a/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy +++ b/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy @@ -98,7 +98,7 @@ private void createKafkaIOTestJob(testJob) { additionalPipelineArgs = [ influx_db_name: InfluxDBCredentialsHelper.InfluxDBDatabaseName, influx_hostname: InfluxDBCredentialsHelper.InfluxDBHostUrl, - bootstrap_servers: "\$KAFKA_BROKER_0:\$NODE_PORT_0,\$KAFKA_BROKER_1:\$NODE_PORT_1,\$KAFKA_BROKER_2:\$NODE_PORT_2", + bootstrap_servers: "\$KAFKA_BROKER_0:\$KAFKA_SERVICE_PORT_0,\$KAFKA_BROKER_1:\$KAFKA_SERVICE_PORT_1,\$KAFKA_BROKER_2:\$KAFKA_SERVICE_PORT_2", ] testJob.pipelineOptions.putAll(additionalPipelineArgs)