diff --git a/sdks/go/pkg/beam/testing/ptest/ptest.go b/sdks/go/pkg/beam/testing/ptest/ptest.go index bb237cb885b..d671a7784bf 100644 --- a/sdks/go/pkg/beam/testing/ptest/ptest.go +++ b/sdks/go/pkg/beam/testing/ptest/ptest.go @@ -28,12 +28,6 @@ import ( _ "github.com/apache/beam/sdks/go/pkg/beam/runners/direct" ) -var ( - // expansionAddr is the endpoint for an expansion service for cross-language - // transforms. - ExpansionAddr = flag.String("expansion_addr", "", "Address of Expansion Service") -) - // TODO(herohde) 7/10/2017: add hooks to verify counters, logs, etc. // Create creates a pipeline and a PCollection with the given values. @@ -122,3 +116,25 @@ func MainWithDefault(m *testing.M, runner string) { beam.Init() os.Exit(m.Run()) } + +// MainRet is equivelant to Main, but returns an exit code to pass to os.Exit(). +// +// Example: +// +// func TestMain(m *testing.M) { +// os.Exit(ptest.Main(m)) +// } +func MainRet(m *testing.M) int { + return MainRetWithDefault(m, "direct") +} + +// MainRetWithDefault is equivelant to MainWithDefault but returns an exit code +// to pass to os.Exit(). +func MainRetWithDefault(m *testing.M, runner string) int { + defaultRunner = runner + if !flag.Parsed() { + flag.Parse() + } + beam.Init() + return m.Run() +} diff --git a/sdks/go/test/build.gradle b/sdks/go/test/build.gradle index efd88ac923c..8d979d2e464 100644 --- a/sdks/go/test/build.gradle +++ b/sdks/go/test/build.gradle @@ -61,7 +61,7 @@ task dataflowValidatesRunner() { def options = [ "--runner dataflow", "--dataflow_worker_jar ${project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath}", - "--expansion_service_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", + "--test_expansion_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", ] if (project.jenkins.isCIBuild) { // Needed when running this task on Jenkins. options.add("--jenkins") @@ -84,7 +84,7 @@ task flinkValidatesRunner { def options = [ "--runner flink", "--flink_job_server_jar ${project(":runners:flink:${project.ext.latestFlinkVersion}:job-server").shadowJar.archivePath}", - "--expansion_service_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", + "--test_expansion_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", ] if (project.jenkins.isCIBuild) { // Needed when running this task on Jenkins. options.add("--jenkins") @@ -106,7 +106,7 @@ task sparkValidatesRunner { def options = [ "--runner spark", "--spark_job_server_jar ${project(":runners:spark:2:job-server").shadowJar.archivePath}", - "--expansion_service_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", + "--test_expansion_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", ] if (project.jenkins.isCIBuild) { // Needed when running this task on Jenkins. options.add("--jenkins") @@ -135,7 +135,7 @@ task ulrValidatesRunner { doLast { def options = [ "--runner portable", - "--expansion_service_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", + "--test_expansion_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}", ] if (project.jenkins.isCIBuild) { // Needed when running this task on Jenkins. options.add("--jenkins") diff --git a/sdks/go/test/integration/flags.go b/sdks/go/test/integration/flags.go new file mode 100644 index 00000000000..6eee0e06a99 --- /dev/null +++ b/sdks/go/test/integration/flags.go @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import "flag" + +// The following flags are flags used in one or more integration tests, and that +// may be used by scripts that execute "go test ./sdks/go/test/integration/...". +// Because any flags used with those commands are used for each package, every +// integration test package must import these flags, even if they are not used. +var ( + // TestExpansionAddr is the endpoint for the expansion service for test-only + // cross-language transforms. + TestExpansionAddr = flag.String("test_expansion_addr", "", "Address of Expansion Service for test cross-language transforms.") + + // IoExpansionAddr is the endpoint for the expansion service for + // cross-language IO transforms. + IoExpansionAddr = flag.String("io_expansion_addr", "", "Address of Expansion Service for cross-language IOs.") + + // BootstrapServers is the address of the bootstrap servers for a Kafka + // cluster, used for Kafka IO tests. + BootstrapServers = flag.String("bootstrap_servers", "", + "URL of the bootstrap servers for the Kafka cluster. Should be accessible by the runner.") + + // KafkaJar is a filepath to a jar for starting a Kafka cluster, used for + // Kafka IO tests. + KafkaJar = flag.String("kafka_jar", "", + "The filepath to a jar for starting up a Kafka cluster. Only used if boostrap_servers is unspecified.") +) diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index 7db20ac14f9..ec03a4cd140 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -21,7 +21,8 @@ // should be placed in smaller sub-packages for organizational purposes and // parallelism (tests are only run in parallel across different packages). // Integration tests should always begin with a call to CheckFilters to ensure -// test filters can be applied. +// test filters can be applied, and each package containing integration tests +// should call ptest.Main in a TestMain function if it uses ptest. // // Running integration tests can be done with a go test call with any flags that // are required by the test pipelines, such as --runner or --endpoint. diff --git a/sdks/go/test/integration/io/xlang/kafka/jar.go b/sdks/go/test/integration/io/xlang/kafka/jar.go new file mode 100644 index 00000000000..36bffa3e0bf --- /dev/null +++ b/sdks/go/test/integration/io/xlang/kafka/jar.go @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "net" + "os" + "os/exec" + "strconv" + "time" +) + +// kafkaCluster contains anything needed to use and clean up the Kafka cluster +// once it's been started. +type kafkaCluster struct { + proc *os.Process // The process information for the running jar. + bootstrapAddr string // The bootstrap address to connect to Kafka. +} + +// runLocalKafka takes a Kafka jar filepath and runs a local Kafka cluster, +// returning the bootstrap server for that cluster. +func runLocalKafka(jar string) (*kafkaCluster, error) { + port, err := getOpenPort() + if err != nil { + return nil, err + } + kafkaPort := strconv.Itoa(port) + port, err = getOpenPort() + if err != nil { + return nil, err + } + zookeeperPort := strconv.Itoa(port) + + cmd := exec.Command("java", "-jar", jar, kafkaPort, zookeeperPort) + err = cmd.Start() + if err != nil { + return nil, err + } + time.Sleep(3 * time.Second) // Wait a bit for the cluster to start. + + return &kafkaCluster{proc: cmd.Process, bootstrapAddr: "localhost:" + kafkaPort}, nil +} + +// getOpenPort gets an open TCP port and returns it, or an error on failure. +func getOpenPort() (int, error) { + listener, err := net.Listen("tcp", ":0") + if err != nil { + return 0, err + } + defer listener.Close() + return listener.Addr().(*net.TCPAddr).Port, nil +} diff --git a/sdks/go/test/integration/io/xlang/kafka/kafka.go b/sdks/go/test/integration/io/xlang/kafka/kafka.go new file mode 100644 index 00000000000..a980b613574 --- /dev/null +++ b/sdks/go/test/integration/io/xlang/kafka/kafka.go @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package kafka contains integration tests for cross-language Kafka IO +// transforms. +package kafka + +import ( + "bytes" + "fmt" + + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder" + "github.com/apache/beam/sdks/go/pkg/beam/io/xlang/kafkaio" + "github.com/apache/beam/sdks/go/pkg/beam/testing/passert" + "github.com/google/uuid" +) + +func appendUuid(prefix string) string { + return fmt.Sprintf("%v_%v", prefix, uuid.New()) +} + +// Constants for the BasicPipeline. +const ( + numRecords = 1000 + basicTopic = "xlang_kafkaio_basic_test" +) + +// BasicPipeline creates a pipeline that writes and then reads a range of ints +// to and from a Kafka topic and asserts that all elements are present. This +// function requires an expansion service address and a Kafka bootstrap server +// address. +func BasicPipeline(expansionAddr, bootstrapAddr string) *beam.Pipeline { + topic := appendUuid(basicTopic) + inputs := make([]int, numRecords) + for i := 0; i < numRecords; i++ { + inputs[i] = i + } + p, s := beam.NewPipelineWithRoot() + ins := beam.CreateList(s, inputs) + + // Write to Kafka + encoded := beam.ParDo(s, func(i int) ([]byte, error) { + var buf bytes.Buffer + err := coder.EncodeVarInt(int64(i), &buf) + return buf.Bytes(), err + }, ins) + keyed := beam.ParDo(s, func(b []byte) ([]byte, []byte) { + return []byte(""), b + }, encoded) + kafkaio.Write(s, expansionAddr, bootstrapAddr, topic, keyed) + + // Read from Kafka + reads := kafkaio.Read(s, expansionAddr, bootstrapAddr, []string{topic}) + vals := beam.DropKey(s, reads) + decoded := beam.ParDo(s, func(b []byte) (int, error) { + buf := bytes.NewBuffer(b) + i, err := coder.DecodeVarInt(buf) + return int(i), err + }, vals) + + passert.Equals(s, decoded, ins) + + return p +} diff --git a/sdks/go/test/integration/io/xlang/kafka/kafka_test.go b/sdks/go/test/integration/io/xlang/kafka/kafka_test.go new file mode 100644 index 00000000000..bb1e019dcdc --- /dev/null +++ b/sdks/go/test/integration/io/xlang/kafka/kafka_test.go @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "log" + "os" + "testing" + + _ "github.com/apache/beam/sdks/go/pkg/beam/runners/dataflow" + _ "github.com/apache/beam/sdks/go/pkg/beam/runners/flink" + _ "github.com/apache/beam/sdks/go/pkg/beam/runners/spark" + "github.com/apache/beam/sdks/go/pkg/beam/testing/ptest" + "github.com/apache/beam/sdks/go/test/integration" +) + +// bootstrapAddr should be set by TestMain once a Kafka cluster has been +// started, and is used by each test. +var bootstrapAddr string + +func checkFlags(t *testing.T) { + if *integration.IoExpansionAddr == "" { + t.Skip("No IO expansion address provided.") + } + if bootstrapAddr == "" { + t.Skip("No bootstrap server address provided.") + } +} + +// TestBasicPipeline tests a basic Kafka pipeline that writes to and reads from +// Kafka with no optional parameters or extra features. +func TestBasicPipeline(t *testing.T) { + integration.CheckFilters(t) + checkFlags(t) + p := BasicPipeline(*integration.IoExpansionAddr, bootstrapAddr) + ptest.RunAndValidate(t, p) +} + +// TestMain starts up a Kafka cluster from integration.KafkaJar before running +// tests through ptest.Main. +func TestMain(m *testing.M) { + // Defer os.Exit so it happens after other defers. + var retCode int + defer func() { os.Exit(retCode) }() + + // Start local Kafka cluster and defer its shutdown. + if *integration.BootstrapServers != "" { + bootstrapAddr = *integration.BootstrapServers + } else if *integration.KafkaJar != "" { + cluster, err := runLocalKafka(*integration.KafkaJar) + if err != nil { + log.Fatalf("Kafka cluster failed to start: %v", err) + } + defer func() { cluster.proc.Kill() }() + bootstrapAddr = cluster.bootstrapAddr + } + + retCode = ptest.MainRet(m) +} diff --git a/sdks/go/test/integration/xlang/xlang_test.go b/sdks/go/test/integration/xlang/xlang_test.go index 0c931071640..74e57c8c333 100644 --- a/sdks/go/test/integration/xlang/xlang_test.go +++ b/sdks/go/test/integration/xlang/xlang_test.go @@ -45,8 +45,8 @@ func init() { } func checkFlags(t *testing.T) { - if *ptest.ExpansionAddr == "" { - t.Skip("No expansion address provided") + if *integration.TestExpansionAddr == "" { + t.Skip("No expansion address provided.") } } @@ -120,7 +120,7 @@ func TestXLang_Prefix(t *testing.T) { // Using the cross-language transform strings := beam.Create(s, "a", "b", "c") - prefixed := xlang.Prefix(s, "prefix_", *ptest.ExpansionAddr, strings) + prefixed := xlang.Prefix(s, "prefix_", *integration.TestExpansionAddr, strings) passert.Equals(s, prefixed, "prefix_a", "prefix_b", "prefix_c") ptest.RunAndValidate(t, p) @@ -136,7 +136,7 @@ func TestXLang_CoGroupBy(t *testing.T) { // Using the cross-language transform col1 := beam.ParDo(s, getIntString, beam.Create(s, IntString{X: 0, Y: "1"}, IntString{X: 0, Y: "2"}, IntString{X: 1, Y: "3"})) col2 := beam.ParDo(s, getIntString, beam.Create(s, IntString{X: 0, Y: "4"}, IntString{X: 1, Y: "5"}, IntString{X: 1, Y: "6"})) - c := xlang.CoGroupByKey(s, *ptest.ExpansionAddr, col1, col2) + c := xlang.CoGroupByKey(s, *integration.TestExpansionAddr, col1, col2) sums := beam.ParDo(s, sumCounts, c) formatted := beam.ParDo(s, formatIntStringsFn, sums) passert.Equals(s, formatted, "0:[1 2 4]", "1:[3 5 6]") @@ -154,7 +154,7 @@ func TestXLang_Combine(t *testing.T) { // Using the cross-language transform kvs := beam.Create(s, StringInt{X: "a", Y: 1}, StringInt{X: "a", Y: 2}, StringInt{X: "b", Y: 3}) ins := beam.ParDo(s, getStringInt, kvs) - c := xlang.CombinePerKey(s, *ptest.ExpansionAddr, ins) + c := xlang.CombinePerKey(s, *integration.TestExpansionAddr, ins) formatted := beam.ParDo(s, formatStringIntFn, c) passert.Equals(s, formatted, "a:3", "b:3") @@ -172,7 +172,7 @@ func TestXLang_CombineGlobally(t *testing.T) { in := beam.CreateList(s, []int64{1, 2, 3}) // Using the cross-language transform - c := xlang.CombineGlobally(s, *ptest.ExpansionAddr, in) + c := xlang.CombineGlobally(s, *integration.TestExpansionAddr, in) formatted := beam.ParDo(s, formatIntFn, c) passert.Equals(s, formatted, "6") @@ -191,7 +191,7 @@ func TestXLang_Flatten(t *testing.T) { col2 := beam.CreateList(s, []int64{4, 5, 6}) // Using the cross-language transform - c := xlang.Flatten(s, *ptest.ExpansionAddr, col1, col2) + c := xlang.Flatten(s, *integration.TestExpansionAddr, col1, col2) formatted := beam.ParDo(s, formatIntFn, c) passert.Equals(s, formatted, "1", "2", "3", "4", "5", "6") @@ -209,7 +209,7 @@ func TestXLang_GroupBy(t *testing.T) { // Using the cross-language transform kvs := beam.Create(s, StringInt{X: "0", Y: 1}, StringInt{X: "0", Y: 2}, StringInt{X: "1", Y: 3}) in := beam.ParDo(s, getStringInt, kvs) - out := xlang.GroupByKey(s, *ptest.ExpansionAddr, in) + out := xlang.GroupByKey(s, *integration.TestExpansionAddr, in) vals := beam.ParDo(s, collectValues, out) formatted := beam.ParDo(s, formatStringIntsFn, vals) @@ -230,7 +230,7 @@ func TestXLang_Multi(t *testing.T) { side := beam.CreateList(s, []string{"s"}) // Using the cross-language transform - mainOut, sideOut := xlang.Multi(s, *ptest.ExpansionAddr, main1, main2, side) + mainOut, sideOut := xlang.Multi(s, *integration.TestExpansionAddr, main1, main2, side) passert.Equals(s, mainOut, "as", "bbs", "xs", "yys", "zzzs") passert.Equals(s, sideOut, "ss") @@ -248,7 +248,7 @@ func TestXLang_Partition(t *testing.T) { col := beam.CreateList(s, []int64{1, 2, 3, 4, 5, 6}) // Using the cross-language transform - out0, out1 := xlang.Partition(s, *ptest.ExpansionAddr, col) + out0, out1 := xlang.Partition(s, *integration.TestExpansionAddr, col) formatted0 := beam.ParDo(s, formatIntFn, out0) formatted1 := beam.ParDo(s, formatIntFn, out1) diff --git a/sdks/go/test/run_validatesrunner_tests.sh b/sdks/go/test/run_validatesrunner_tests.sh index 00ec4539610..33beedb37b1 100755 --- a/sdks/go/test/run_validatesrunner_tests.sh +++ b/sdks/go/test/run_validatesrunner_tests.sh @@ -30,13 +30,26 @@ # dataflow - Dataflow Runner # # General flags: +# --tests -> A space-seperated list of targets for "go test". Defaults to +# all packages in the integration and regression directories. # --timeout -> Timeout for the go test command, on a per-package level. # --endpoint -> An endpoint for an existing job server outside the script. # If present, job server jar flags are ignored. -# --expansion_service_jar -> Filepath to jar for expansion service, for -# runners that support cross-language. -# --expansion_addr -> An endpoint for an existing expansion service outside -# the script. If present, --expansion_service_jar is ignored. +# --test_expansion_jar -> Filepath to jar for an expansion service, for +# runners that support cross-language. The test expansion service is one +# that can expand test-only cross-language transforms. +# --test_expansion_addr -> An endpoint for an existing test expansion service +# outside the script. If present, --test_expansion_jar is ignored. +# --io_expansion_jar -> Filepath to jar for an expansion service, for +# runners that support cross-language. The IO expansion service is one +# that can expand cross-language transforms for Beam IOs. +# --io_expansion_addr -> An endpoint for an existing expansion service +# outside the script. If present, --io_expansion_jar is ignored. +# --sdk_overrides -> Only needed if performing cross-lanaguage tests with +# a staged SDK harness container. Note for Dataflow: Using this flag +# prevents the script from creating and staging a container. +# --pipeline_opts -> Appends additional pipeline options to the test command, +# in addition to those already added by this script. # # Runner-specific flags: # Flink @@ -56,6 +69,9 @@ set -e set -v +# Default test targets. +TESTS="./sdks/go/test/integration/... ./sdks/go/test/regression" + # Default runner. RUNNER=portable @@ -76,8 +92,11 @@ exit_background_processes () { if [[ ! -z "$JOBSERVER_PID" ]]; then kill -9 $JOBSERVER_PID || true fi - if [[ ! -z "$EXPANSION_PID" ]]; then - kill -9 $EXPANSION_PID + if [[ ! -z "$TEST_EXPANSION_PID" ]]; then + kill -9 $TEST_EXPANSION_PID + fi + if [[ ! -z "$IO_EXPANSION_PID" ]]; then + kill -9 $IO_EXPANSION_PID fi } trap exit_background_processes SIGINT SIGTERM EXIT @@ -86,6 +105,11 @@ while [[ $# -gt 0 ]] do key="$1" case $key in + --tests) + TESTS="$2" + shift # past argument + shift # past value + ;; --runner) RUNNER="$2" shift # past argument @@ -136,13 +160,33 @@ case $key in shift # past argument shift # past value ;; - --expansion_service_jar) - EXPANSION_SERVICE_JAR="$2" + --test_expansion_jar) + TEST_EXPANSION_JAR="$2" + shift # past argument + shift # past value + ;; + --test_expansion_addr) + TEST_EXPANSION_ADDR="$2" + shift # past argument + shift # past value + ;; + --io_expansion_jar) + IO_EXPANSION_JAR="$2" + shift # past argument + shift # past value + ;; + --io_expansion_addr) + IO_EXPANSION_ADDR="$2" + shift # past argument + shift # past value + ;; + --sdk_overrides) + SDK_OVERRIDES="$2" shift # past argument shift # past value ;; - --expansion_addr) - EXPANSION_ADDR="$2" + --pipeline_opts) + PIPELINE_OPTS="$2" shift # past argument shift # past value ;; @@ -180,12 +224,19 @@ if [[ "$RUNNER" == "dataflow" ]]; then fi echo "Using Dataflow worker jar: $DATAFLOW_WORKER_JAR" - if [[ -z "$EXPANSION_ADDR" && -n "$EXPANSION_SERVICE_JAR" ]]; then + if [[ -z "$TEST_EXPANSION_ADDR" && -n "$TEST_EXPANSION_JAR" ]]; then + EXPANSION_PORT=$(python3 -c "$SOCKET_SCRIPT") + TEST_EXPANSION_ADDR="localhost:$EXPANSION_PORT" + echo "No test expansion address specified; starting a new test expansion server on $TEST_EXPANSION_ADDR" + java -jar $TEST_EXPANSION_JAR $EXPANSION_PORT & + TEST_EXPANSION_PID=$! + fi + if [[ -z "$IO_EXPANSION_ADDR" && -n "$IO_EXPANSION_JAR" ]]; then EXPANSION_PORT=$(python3 -c "$SOCKET_SCRIPT") - EXPANSION_ADDR="localhost:$EXPANSION_PORT" - echo "No expansion address specified; starting a new expansion server on $EXPANSION_ADDR" - java -jar $EXPANSION_SERVICE_JAR $EXPANSION_PORT & - EXPANSION_PID=$! + IO_EXPANSION_ADDR="localhost:$EXPANSION_PORT" + echo "No IO expansion address specified; starting a new IO expansion server on $IO_EXPANSION_ADDR" + java -jar $IO_EXPANSION_JAR $EXPANSION_PORT & + IO_EXPANSION_PID=$! fi elif [[ "$RUNNER" == "flink" || "$RUNNER" == "spark" || "$RUNNER" == "portable" ]]; then if [[ -z "$ENDPOINT" ]]; then @@ -218,12 +269,19 @@ elif [[ "$RUNNER" == "flink" || "$RUNNER" == "spark" || "$RUNNER" == "portable" JOBSERVER_PID=$! fi - if [[ -z "$EXPANSION_ADDR" && -n "$EXPANSION_SERVICE_JAR" ]]; then + if [[ -z "$TEST_EXPANSION_ADDR" && -n "$TEST_EXPANSION_JAR" ]]; then EXPANSION_PORT=$(python3 -c "$SOCKET_SCRIPT") - EXPANSION_ADDR="localhost:$EXPANSION_PORT" - echo "No expansion address specified; starting a new expansion server on $EXPANSION_ADDR" - java -jar $EXPANSION_SERVICE_JAR $EXPANSION_PORT & - EXPANSION_PID=$! + TEST_EXPANSION_ADDR="localhost:$EXPANSION_PORT" + echo "No test expansion address specified; starting a new test expansion server on $TEST_EXPANSION_ADDR" + java -jar $TEST_EXPANSION_JAR $EXPANSION_PORT & + TEST_EXPANSION_PID=$! + fi + if [[ -z "$IO_EXPANSION_ADDR" && -n "$IO_EXPANSION_JAR" ]]; then + EXPANSION_PORT=$(python3 -c "$SOCKET_SCRIPT") + IO_EXPANSION_ADDR="localhost:$EXPANSION_PORT" + echo "No IO expansion address specified; starting a new IO expansion server on $IO_EXPANSION_ADDR" + java -jar $IO_EXPANSION_JAR $EXPANSION_PORT & + IO_EXPANSION_PID=$! fi fi @@ -261,18 +319,24 @@ if [[ "$RUNNER" == "dataflow" ]]; then # Push the container gcloud docker -- push $CONTAINER:$TAG - if [[ -n "$EXPANSION_ADDR" ]]; then - # Build the java container for cross-language - JAVA_TAG=$(date +%Y%m%d-%H%M%S) - JAVA_CONTAINER=us.gcr.io/$PROJECT/$USER/beam_java11_sdk - echo "Using container $JAVA_CONTAINER for cross-language java transforms" - ./gradlew :sdks:java:container:java11:docker -Pdocker-repository-root=us.gcr.io/$PROJECT/$USER -Pdocker-tag=$JAVA_TAG + if [[ -n "$TEST_EXPANSION_ADDR" || -n "$IO_EXPANSION_ADDR" ]]; then + ARGS="$ARGS --experiments=use_portable_job_submission" + + if [[ -z "$SDK_OVERRIDES" ]]; then + # Build the java container for cross-language + JAVA_TAG=$(date +%Y%m%d-%H%M%S) + JAVA_CONTAINER=us.gcr.io/$PROJECT/$USER/beam_java11_sdk + echo "Using container $JAVA_CONTAINER for cross-language java transforms" + ./gradlew :sdks:java:container:java11:docker -Pdocker-repository-root=us.gcr.io/$PROJECT/$USER -Pdocker-tag=$JAVA_TAG - # Verify it exists - docker images | grep $JAVA_TAG + # Verify it exists + docker images | grep $JAVA_TAG - # Push the container - gcloud docker -- push $JAVA_CONTAINER:$JAVA_TAG + # Push the container + gcloud docker -- push $JAVA_CONTAINER:$JAVA_TAG + + SDK_OVERRIDES=".*java.*,$JAVA_CONTAINER:$JAVA_TAG" + fi fi else TAG=dev @@ -280,6 +344,7 @@ else CONTAINER=apache/beam_go_sdk fi +# Assemble test arguments and pipeline options. ARGS="$ARGS --timeout=$TIMEOUT" ARGS="$ARGS --runner=$RUNNER" ARGS="$ARGS --project=$DATAFLOW_PROJECT" @@ -290,14 +355,17 @@ ARGS="$ARGS --staging_location=$GCS_LOCATION/staging-validatesrunner-test" ARGS="$ARGS --temp_location=$GCS_LOCATION/temp-validatesrunner-test" ARGS="$ARGS --dataflow_worker_jar=$DATAFLOW_WORKER_JAR" ARGS="$ARGS --endpoint=$ENDPOINT" -OVERRIDE=--sdk_harness_container_image_override=".*java.*,$JAVA_CONTAINER:$JAVA_TAG" -ARGS="$ARGS $OVERRIDE" -if [[ -n "$EXPANSION_ADDR" ]]; then - ARGS="$ARGS --expansion_addr=$EXPANSION_ADDR" - if [[ "$RUNNER" == "dataflow" ]]; then - ARGS="$ARGS --experiments=use_portable_job_submission" - fi +if [[ -n "$TEST_EXPANSION_ADDR" ]]; then + ARGS="$ARGS --test_expansion_addr=$TEST_EXPANSION_ADDR" fi +if [[ -n "$IO_EXPANSION_ADDR" ]]; then + ARGS="$ARGS --io_expansion_addr=$IO_EXPANSION_ADDR" +fi +if [[ -n "$SDK_OVERRIDES" ]]; then + OVERRIDE=--sdk_harness_container_image_override="$SDK_OVERRIDES" + ARGS="$ARGS $OVERRIDE" +fi +ARGS="$ARGS $PIPELINE_OPTS" # Running "go test" requires some additional setup on Jenkins. if [[ "$JENKINS" == true ]]; then @@ -309,12 +377,16 @@ if [[ "$JENKINS" == true ]]; then TEMP_GOPATH=$(pwd)/temp_gopath cd ./src - echo ">>> RUNNING $RUNNER VALIDATESRUNNER TESTS" - GOPATH=$TEMP_GOPATH go test -v github.com/apache/beam/sdks/go/test/integration/... github.com/apache/beam/sdks/go/test/regression $ARGS \ + # Search and replace working directory on test targets with new directory. + TESTS="${TESTS//"./"/"github.com/apache/beam/"}" + echo ">>> For Jenkins environment, changing test targets to: $TESTS" + + echo ">>> RUNNING $RUNNER integration tests with pipeline options: $ARGS" + GOPATH=$TEMP_GOPATH go test -v $TESTS $ARGS \ || TEST_EXIT_CODE=$? # don't fail fast here; clean up environment before exiting else - echo ">>> RUNNING $RUNNER VALIDATESRUNNER TESTS" - go test -v ./sdks/go/test/integration/... ./sdks/go/test/regression $ARGS \ + echo ">>> RUNNING $RUNNER integration tests with pipeline options: $ARGS" + go test -v $TESTS $ARGS \ || TEST_EXIT_CODE=$? # don't fail fast here; clean up environment before exiting fi @@ -323,7 +395,7 @@ if [[ "$RUNNER" == "dataflow" ]]; then docker rmi $CONTAINER:$TAG || echo "Failed to remove container" gcloud --quiet container images delete $CONTAINER:$TAG || echo "Failed to delete container" - if [[ -n "$EXPANSION_ADDR" ]]; then + if [[ -n "$TEST_EXPANSION_ADDR" || -n "$IO_EXPANSION_ADDR" ]]; then # Delete the java cross-language container locally and remotely docker rmi $JAVA_CONTAINER:$JAVA_TAG || echo "Failed to remove container" gcloud --quiet container images delete $JAVA_CONTAINER:$JAVA_TAG || echo "Failed to delete container"