diff --git a/.test-infra/jenkins/Kubernetes.groovy b/.test-infra/jenkins/Kubernetes.groovy index 957c823cbd88..eaf025c4e182 100644 --- a/.test-infra/jenkins/Kubernetes.groovy +++ b/.test-infra/jenkins/Kubernetes.groovy @@ -116,6 +116,40 @@ class Kubernetes { } } + /** + * Specifies steps that will save specified node ip address + * as an environment variable that can be used in later steps if needed. + * + * @param nodeIndex - index of node + * @param referenceName - name of the environment variable + */ + void nodeIPAddress(int nodeIndex, String referenceName) { + job.steps { + String command = "${KUBERNETES_SCRIPT} nodeIPAddress ${nodeIndex}" + shell("set -eo pipefail; eval ${command} | sed 's/^/${referenceName}=/' > job.properties") + environmentVariables { + propertiesFile('job.properties') + } + } + } + + /** + * Specifies steps that will save the NodePort of the specified service + * as an environment variable that can be used in later steps if needed. + * + * @param serviceName - name of the load balancer Kubernetes service + * @param referenceName - name of the environment variable + */ + void nodePort(String serviceName, String referenceName) { + job.steps { + String command = "${KUBERNETES_SCRIPT} nodePort ${serviceName}" + shell("set -eo pipefail; eval ${command} | sed 's/^/${referenceName}=/' > job.properties") + environmentVariables { + propertiesFile('job.properties') + } + } + } + /** * Specifies steps that will save specified load balancer serivce address * as an environment variable that can be used in later steps if needed. diff --git a/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy b/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy index d513dd96a7e2..c9009445cea7 100644 --- a/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy +++ b/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy @@ -65,13 +65,16 @@ job(jobName) { } } k8s.apply(kafkaDir) - (0..2).each { k8s.loadBalancerIP("outside-$it", "KAFKA_BROKER_$it") } + (0..2).each { + k8s.nodeIPAddress(it, "KAFKA_BROKER_$it") + } k8s.waitForJob(kafkaTopicJob,"40m") Map pipelineOptions = [ tempRoot : 'gs://temp-storage-for-perf-tests', project : 'apache-beam-testing', runner : 'DataflowRunner', + usePublicIps : 'false', // See .test-infra/terraform/google-cloud-platform/networking sourceOptions : """ { "numRecords": "100000000", @@ -84,8 +87,7 @@ job(jobName) { influxMeasurement : 'kafkaioit_results', influxDatabase : InfluxDBCredentialsHelper.InfluxDBDatabaseName, influxHost : InfluxDBCredentialsHelper.InfluxDBHostUrl, - kafkaBootstrapServerAddresses: "\$KAFKA_BROKER_0:\$KAFKA_SERVICE_PORT_0,\$KAFKA_BROKER_1:\$KAFKA_SERVICE_PORT_1," + - "\$KAFKA_BROKER_2:\$KAFKA_SERVICE_PORT_2", //KAFKA_BROKER_ represents IP and KAFKA_SERVICE_ port of outside services + kafkaBootstrapServerAddresses: "\$KAFKA_BROKER_0:\$KAFKA_SERVICE_PORT_0,\$KAFKA_BROKER_1:\$KAFKA_SERVICE_PORT_1,\$KAFKA_BROKER_2:\$KAFKA_SERVICE_PORT_2", kafkaTopic : 'beam-batch', readTimeout : '1800', numWorkers : '5', diff --git a/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy b/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy index 142921c277a4..7cd3bf3f0140 100644 --- a/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy +++ b/.test-infra/jenkins/job_PerformanceTests_xlang_KafkaIO_Python.groovy @@ -90,14 +90,15 @@ private void createKafkaIOTestJob(testJob) { } } k8s.apply(kafkaDir) - (0..2).each { k8s.loadBalancerIP("outside-$it", "KAFKA_BROKER_$it") } + (0..2).each { + k8s.nodeIPAddress(it, "KAFKA_BROKER_$it") + } k8s.waitForJob(kafkaTopicJob,"40m") additionalPipelineArgs = [ influx_db_name: InfluxDBCredentialsHelper.InfluxDBDatabaseName, influx_hostname: InfluxDBCredentialsHelper.InfluxDBHostUrl, - bootstrap_servers: "\$KAFKA_BROKER_0:\$KAFKA_SERVICE_PORT_0,\$KAFKA_BROKER_1:\$KAFKA_SERVICE_PORT_1," + - "\$KAFKA_BROKER_2:\$KAFKA_SERVICE_PORT_2", //KAFKA_BROKER_ represents IP and KAFKA_SERVICE_ port of outside services + bootstrap_servers: "\$KAFKA_BROKER_0:\$KAFKA_SERVICE_PORT_0,\$KAFKA_BROKER_1:\$KAFKA_SERVICE_PORT_1,\$KAFKA_BROKER_2:\$KAFKA_SERVICE_PORT_2", ] testJob.pipelineOptions.putAll(additionalPipelineArgs) @@ -106,6 +107,7 @@ private void createKafkaIOTestJob(testJob) { project : 'apache-beam-testing', region : 'us-central1', temp_location : 'gs://temp-storage-for-perf-tests/', + usePublicIps : 'false', // See .test-infra/terraform/google-cloud-platform/networking filename_prefix : "gs://temp-storage-for-perf-tests/${testJob.name}/\${BUILD_ID}/", sdk_harness_container_image_overrides: '.*java.*,gcr.io/apache-beam-testing/beam-sdk/beam_java8_sdk:latest' ] diff --git a/.test-infra/kubernetes/kafka-cluster/04-outside-services/outside-0.yml b/.test-infra/kubernetes/kafka-cluster/04-outside-services/outside-0.yml index e7513ec1b89a..653c5043b6dc 100644 --- a/.test-infra/kubernetes/kafka-cluster/04-outside-services/outside-0.yml +++ b/.test-infra/kubernetes/kafka-cluster/04-outside-services/outside-0.yml @@ -26,4 +26,4 @@ spec: targetPort: 9094 port: 32400 nodePort: 32400 - type: LoadBalancer + type: NodePort diff --git a/.test-infra/kubernetes/kafka-cluster/04-outside-services/outside-1.yml b/.test-infra/kubernetes/kafka-cluster/04-outside-services/outside-1.yml index 50e5fb0650bd..bc146d868159 100644 --- a/.test-infra/kubernetes/kafka-cluster/04-outside-services/outside-1.yml +++ b/.test-infra/kubernetes/kafka-cluster/04-outside-services/outside-1.yml @@ -26,4 +26,4 @@ spec: targetPort: 9094 port: 32401 nodePort: 32401 - type: LoadBalancer + type: NodePort diff --git a/.test-infra/kubernetes/kafka-cluster/04-outside-services/outside-2.yml b/.test-infra/kubernetes/kafka-cluster/04-outside-services/outside-2.yml index 87c324b8eea9..886246c40198 100644 --- a/.test-infra/kubernetes/kafka-cluster/04-outside-services/outside-2.yml +++ b/.test-infra/kubernetes/kafka-cluster/04-outside-services/outside-2.yml @@ -26,4 +26,4 @@ spec: targetPort: 9094 port: 32402 nodePort: 32402 - type: LoadBalancer + type: NodePort diff --git a/.test-infra/kubernetes/kafka-cluster/05-kafka/10broker-config.yml b/.test-infra/kubernetes/kafka-cluster/05-kafka/10broker-config.yml index c465e026e48c..b662ae7b253b 100644 --- a/.test-infra/kubernetes/kafka-cluster/05-kafka/10broker-config.yml +++ b/.test-infra/kubernetes/kafka-cluster/05-kafka/10broker-config.yml @@ -38,14 +38,8 @@ data: SEDS+=("s/#init#broker.rack=#init#/broker.rack=$ZONE/") LABELS="$LABELS kafka-broker-rack=$ZONE" fi - OUTSIDE_HOST="" - while [ -z $OUTSIDE_HOST ]; do - echo "Waiting for end point..." - OUTSIDE_HOST=$(kubectl get svc outside-${KAFKA_BROKER_ID} --template="{{range .status.loadBalancer.ingress}}{{.ip}}{{end}}") - [ -z "$OUTSIDE_HOST" ] && sleep 10 - done - # OUTSIDE_HOST=$(kubectl get node "$NODE_NAME" -o jsonpath='{.status.addresses[?(@.type=="InternalIP")].address}') - OUTSIDE_PORT=$(kubectl get svc outside-${KAFKA_BROKER_ID} --template="{{range .spec.ports}}{{.port}}{{end}}") + OUTSIDE_HOST=$(kubectl get node "$NODE_NAME" -o jsonpath='{.status.addresses[?(@.type=="InternalIP")].address}') + OUTSIDE_PORT=$(kubectl get svc outside-${KAFKA_BROKER_ID} -o jsonpath='{.spec.ports[0].nodePort}') SEDS+=("s|#init#advertised.listeners=OUTSIDE://#init#|advertised.listeners=OUTSIDE://${OUTSIDE_HOST}:${OUTSIDE_PORT}|") ANNOTATIONS="$ANNOTATIONS kafka-listener-outside-host=$OUTSIDE_HOST kafka-listener-outside-port=$OUTSIDE_PORT" diff --git a/.test-infra/kubernetes/kubernetes.sh b/.test-infra/kubernetes/kubernetes.sh index 1ec3cd9364c7..636c4c4026da 100755 --- a/.test-infra/kubernetes/kubernetes.sh +++ b/.test-infra/kubernetes/kubernetes.sh @@ -80,6 +80,23 @@ function deleteNamespace() { eval "kubectl --kubeconfig=${KUBECONFIG} delete namespace $1" } +# Gets Node IP address of cluster +# Usage: ./kubernetes.sh nodeIPAddress nodeIndex +function nodeIPAddress() { + local nodeIndex=$1 + local command="$KUBECTL get node -ojsonpath='{.items[${nodeIndex}].status.addresses[?(@.type==\"InternalIP\")].address}'" +} + +# Gets NodePort of service +# Blocks and retries until the IP is present or retry limit is exceeded. +# +# Usage: ./kubernetes.sh nodePort +function nodePort() { + local name=$1 + local command="$KUBECTL get svc $name -ojsonpath='{.spec.ports[0].nodePort}'" + retry "${command}" 36 10 +} + # Gets Load Balancer Ingress IP address. # Blocks and retries until the IP is present or retry limit is exceeded. # diff --git a/.test-infra/terraform/OWNERS b/.test-infra/terraform/OWNERS new file mode 100644 index 000000000000..d4ebbd97fbb5 --- /dev/null +++ b/.test-infra/terraform/OWNERS @@ -0,0 +1,4 @@ +# See the OWNERS docs at https://s.apache.org/beam-owners + +reviewers: + - damondouglas diff --git a/.test-infra/terraform/README.md b/.test-infra/terraform/README.md new file mode 100644 index 000000000000..60e4d6f984af --- /dev/null +++ b/.test-infra/terraform/README.md @@ -0,0 +1,30 @@ + + +# Overview + +This folder contains Infrastructure-as-Code implemented using +[terraform](https://terraform.io) to provision resources for Apache Beam tests. +Subfolders organize code according to the target environement. + +# Requirements + +Usage of this code minimally requires the terraform command-line interface. +See [terraform](https://terraform.io) for further details. See individual +subfolders for additional requirements for the target environment. \ No newline at end of file diff --git a/.test-infra/terraform/google-cloud-platform/README.md b/.test-infra/terraform/google-cloud-platform/README.md new file mode 100644 index 000000000000..a7a7861b28b0 --- /dev/null +++ b/.test-infra/terraform/google-cloud-platform/README.md @@ -0,0 +1,44 @@ + + +# Overview + +This folder contains Infrastructure-as-Code implemented using +[terraform](https://terraform.io) to provision resources for Apache Beam tests +requiring Google Cloud resources. + +# Requirements + +Usage of this code requires: + +- [terraform cli v1.2.0 and later](https://terraform.io) +- [Google Cloud SDK](https://cloud.google.com/sdk); `gcloud init` + and `gcloud auth` +- Google Cloud project with billing enabled +- IntelliJ or VS Code terraform plugin (optional but **highly** recommended) + +# Usage + +Each folder contains all the code to achieve a category of +provisioning. For example, the [networking](networking) folder contains +all the necessary code to provision Google Cloud related networking resources. + +To provision resources in a chosen folder follow the conventional terraform +workflow. See https://developer.hashicorp.com/terraform/intro/core-workflow +for details. diff --git a/.test-infra/terraform/google-cloud-platform/networking/README.md b/.test-infra/terraform/google-cloud-platform/networking/README.md new file mode 100644 index 000000000000..753119f7082d --- /dev/null +++ b/.test-infra/terraform/google-cloud-platform/networking/README.md @@ -0,0 +1,48 @@ + + +# Overview + +This folder contains Infrastructure-as-Code implemented using +[terraform](https://terraform.io) to provision resources +for Apache Beam tests requiring networking specific Google Cloud resources. + +To support the [`usePublicIps=false`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.html#setUsePublicIps-java.lang.Boolean-) +flag for Dataflow jobs, code in the folder: +- Validates the Google Cloud Virtual Private Cloud (VPC) subnetwork has +has [private Google Access](https://cloud.google.com/vpc/docs/private-google-access) +turned on +- Provisions a [Cloud NAT](https://cloud.google.com/nat/docs/overview) and +[Cloud Router](https://cloud.google.com/network-connectivity/docs/router/concepts/overview) + +# Requirements + +Usage of this code requires: + +- [terraform cli v1.2.0 and later](https://terraform.io) +- [Google Cloud SDK](https://cloud.google.com/sdk); `gcloud init` + and `gcloud auth` +- Google Cloud project with billing enabled +- IntelliJ or VS Code terraform plugin (optional but **highly** recommended) + +# Usage + +To provision resources in a this folder follow the conventional terraform +workflow. See https://developer.hashicorp.com/terraform/intro/core-workflow +for details. diff --git a/.test-infra/terraform/google-cloud-platform/networking/backend.tf b/.test-infra/terraform/google-cloud-platform/networking/backend.tf new file mode 100644 index 000000000000..0816ebcc915f --- /dev/null +++ b/.test-infra/terraform/google-cloud-platform/networking/backend.tf @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +// Tells terraform to store state files in Google Cloud storage. +// The Google Cloud storage bucket name must be hard coded. +// See https://developer.hashicorp.com/terraform/language/settings/backends/configuration +// for details. +terraform { + backend "gcs" { + bucket = "b507e468-52e9-4e72-83e5-ecbf563eda12" + prefix = "terraform/projects/apache-beam-testing/networking" + } +} \ No newline at end of file diff --git a/.test-infra/terraform/google-cloud-platform/networking/data.tf b/.test-infra/terraform/google-cloud-platform/networking/data.tf new file mode 100644 index 000000000000..c36395fc2897 --- /dev/null +++ b/.test-infra/terraform/google-cloud-platform/networking/data.tf @@ -0,0 +1,40 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +// Query the Virtual Private Cloud network +data "google_compute_network" "default" { + name = var.network +} + +// Query the Virtual Private Cloud subnetwork +// Additionally checks whether Private Google Access is true +data "google_compute_subnetwork" "default" { + name = var.subnetwork + region = var.region + lifecycle { + postcondition { + condition = self.private_ip_google_access + error_message = <