Skip to content

Commit

Permalink
Merge pull request #15093: [BEAM-12383] Go Kafka integration test and…
Browse files Browse the repository at this point in the history
… framework changes.

[BEAM-12383] Go Kafka integration test and framework changes.
  • Loading branch information
youngoli authored Jun 29, 2021
2 parents 5fffad6 + 7158943 commit b86fcf9
Show file tree
Hide file tree
Showing 9 changed files with 408 additions and 63 deletions.
28 changes: 22 additions & 6 deletions sdks/go/pkg/beam/testing/ptest/ptest.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,6 @@ import (
_ "github.com/apache/beam/sdks/go/pkg/beam/runners/direct"
)

var (
// expansionAddr is the endpoint for an expansion service for cross-language
// transforms.
ExpansionAddr = flag.String("expansion_addr", "", "Address of Expansion Service")
)

// TODO(herohde) 7/10/2017: add hooks to verify counters, logs, etc.

// Create creates a pipeline and a PCollection with the given values.
Expand Down Expand Up @@ -122,3 +116,25 @@ func MainWithDefault(m *testing.M, runner string) {
beam.Init()
os.Exit(m.Run())
}

// MainRet is equivelant to Main, but returns an exit code to pass to os.Exit().
//
// Example:
//
// func TestMain(m *testing.M) {
// os.Exit(ptest.Main(m))
// }
func MainRet(m *testing.M) int {
return MainRetWithDefault(m, "direct")
}

// MainRetWithDefault is equivelant to MainWithDefault but returns an exit code
// to pass to os.Exit().
func MainRetWithDefault(m *testing.M, runner string) int {
defaultRunner = runner
if !flag.Parsed() {
flag.Parse()
}
beam.Init()
return m.Run()
}
8 changes: 4 additions & 4 deletions sdks/go/test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ task dataflowValidatesRunner() {
def options = [
"--runner dataflow",
"--dataflow_worker_jar ${project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath}",
"--expansion_service_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
"--test_expansion_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
]
if (project.jenkins.isCIBuild) { // Needed when running this task on Jenkins.
options.add("--jenkins")
Expand All @@ -84,7 +84,7 @@ task flinkValidatesRunner {
def options = [
"--runner flink",
"--flink_job_server_jar ${project(":runners:flink:${project.ext.latestFlinkVersion}:job-server").shadowJar.archivePath}",
"--expansion_service_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
"--test_expansion_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
]
if (project.jenkins.isCIBuild) { // Needed when running this task on Jenkins.
options.add("--jenkins")
Expand All @@ -106,7 +106,7 @@ task sparkValidatesRunner {
def options = [
"--runner spark",
"--spark_job_server_jar ${project(":runners:spark:2:job-server").shadowJar.archivePath}",
"--expansion_service_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
"--test_expansion_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
]
if (project.jenkins.isCIBuild) { // Needed when running this task on Jenkins.
options.add("--jenkins")
Expand Down Expand Up @@ -135,7 +135,7 @@ task ulrValidatesRunner {
doLast {
def options = [
"--runner portable",
"--expansion_service_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
"--test_expansion_jar ${project(":sdks:java:testing:expansion-service").buildTestExpansionServiceJar.archivePath}",
]
if (project.jenkins.isCIBuild) { // Needed when running this task on Jenkins.
options.add("--jenkins")
Expand Down
42 changes: 42 additions & 0 deletions sdks/go/test/integration/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package integration

import "flag"

// The following flags are flags used in one or more integration tests, and that
// may be used by scripts that execute "go test ./sdks/go/test/integration/...".
// Because any flags used with those commands are used for each package, every
// integration test package must import these flags, even if they are not used.
var (
// TestExpansionAddr is the endpoint for the expansion service for test-only
// cross-language transforms.
TestExpansionAddr = flag.String("test_expansion_addr", "", "Address of Expansion Service for test cross-language transforms.")

// IoExpansionAddr is the endpoint for the expansion service for
// cross-language IO transforms.
IoExpansionAddr = flag.String("io_expansion_addr", "", "Address of Expansion Service for cross-language IOs.")

// BootstrapServers is the address of the bootstrap servers for a Kafka
// cluster, used for Kafka IO tests.
BootstrapServers = flag.String("bootstrap_servers", "",
"URL of the bootstrap servers for the Kafka cluster. Should be accessible by the runner.")

// KafkaJar is a filepath to a jar for starting a Kafka cluster, used for
// Kafka IO tests.
KafkaJar = flag.String("kafka_jar", "",
"The filepath to a jar for starting up a Kafka cluster. Only used if boostrap_servers is unspecified.")
)
3 changes: 2 additions & 1 deletion sdks/go/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
// should be placed in smaller sub-packages for organizational purposes and
// parallelism (tests are only run in parallel across different packages).
// Integration tests should always begin with a call to CheckFilters to ensure
// test filters can be applied.
// test filters can be applied, and each package containing integration tests
// should call ptest.Main in a TestMain function if it uses ptest.
//
// Running integration tests can be done with a go test call with any flags that
// are required by the test pipelines, such as --runner or --endpoint.
Expand Down
65 changes: 65 additions & 0 deletions sdks/go/test/integration/io/xlang/kafka/jar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka

import (
"net"
"os"
"os/exec"
"strconv"
"time"
)

// kafkaCluster contains anything needed to use and clean up the Kafka cluster
// once it's been started.
type kafkaCluster struct {
proc *os.Process // The process information for the running jar.
bootstrapAddr string // The bootstrap address to connect to Kafka.
}

// runLocalKafka takes a Kafka jar filepath and runs a local Kafka cluster,
// returning the bootstrap server for that cluster.
func runLocalKafka(jar string) (*kafkaCluster, error) {
port, err := getOpenPort()
if err != nil {
return nil, err
}
kafkaPort := strconv.Itoa(port)
port, err = getOpenPort()
if err != nil {
return nil, err
}
zookeeperPort := strconv.Itoa(port)

cmd := exec.Command("java", "-jar", jar, kafkaPort, zookeeperPort)
err = cmd.Start()
if err != nil {
return nil, err
}
time.Sleep(3 * time.Second) // Wait a bit for the cluster to start.

return &kafkaCluster{proc: cmd.Process, bootstrapAddr: "localhost:" + kafkaPort}, nil
}

// getOpenPort gets an open TCP port and returns it, or an error on failure.
func getOpenPort() (int, error) {
listener, err := net.Listen("tcp", ":0")
if err != nil {
return 0, err
}
defer listener.Close()
return listener.Addr().(*net.TCPAddr).Port, nil
}
77 changes: 77 additions & 0 deletions sdks/go/test/integration/io/xlang/kafka/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package kafka contains integration tests for cross-language Kafka IO
// transforms.
package kafka

import (
"bytes"
"fmt"

"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/go/pkg/beam/io/xlang/kafkaio"
"github.com/apache/beam/sdks/go/pkg/beam/testing/passert"
"github.com/google/uuid"
)

func appendUuid(prefix string) string {
return fmt.Sprintf("%v_%v", prefix, uuid.New())
}

// Constants for the BasicPipeline.
const (
numRecords = 1000
basicTopic = "xlang_kafkaio_basic_test"
)

// BasicPipeline creates a pipeline that writes and then reads a range of ints
// to and from a Kafka topic and asserts that all elements are present. This
// function requires an expansion service address and a Kafka bootstrap server
// address.
func BasicPipeline(expansionAddr, bootstrapAddr string) *beam.Pipeline {
topic := appendUuid(basicTopic)
inputs := make([]int, numRecords)
for i := 0; i < numRecords; i++ {
inputs[i] = i
}
p, s := beam.NewPipelineWithRoot()
ins := beam.CreateList(s, inputs)

// Write to Kafka
encoded := beam.ParDo(s, func(i int) ([]byte, error) {
var buf bytes.Buffer
err := coder.EncodeVarInt(int64(i), &buf)
return buf.Bytes(), err
}, ins)
keyed := beam.ParDo(s, func(b []byte) ([]byte, []byte) {
return []byte(""), b
}, encoded)
kafkaio.Write(s, expansionAddr, bootstrapAddr, topic, keyed)

// Read from Kafka
reads := kafkaio.Read(s, expansionAddr, bootstrapAddr, []string{topic})
vals := beam.DropKey(s, reads)
decoded := beam.ParDo(s, func(b []byte) (int, error) {
buf := bytes.NewBuffer(b)
i, err := coder.DecodeVarInt(buf)
return int(i), err
}, vals)

passert.Equals(s, decoded, ins)

return p
}
72 changes: 72 additions & 0 deletions sdks/go/test/integration/io/xlang/kafka/kafka_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka

import (
"log"
"os"
"testing"

_ "github.com/apache/beam/sdks/go/pkg/beam/runners/dataflow"
_ "github.com/apache/beam/sdks/go/pkg/beam/runners/flink"
_ "github.com/apache/beam/sdks/go/pkg/beam/runners/spark"
"github.com/apache/beam/sdks/go/pkg/beam/testing/ptest"
"github.com/apache/beam/sdks/go/test/integration"
)

// bootstrapAddr should be set by TestMain once a Kafka cluster has been
// started, and is used by each test.
var bootstrapAddr string

func checkFlags(t *testing.T) {
if *integration.IoExpansionAddr == "" {
t.Skip("No IO expansion address provided.")
}
if bootstrapAddr == "" {
t.Skip("No bootstrap server address provided.")
}
}

// TestBasicPipeline tests a basic Kafka pipeline that writes to and reads from
// Kafka with no optional parameters or extra features.
func TestBasicPipeline(t *testing.T) {
integration.CheckFilters(t)
checkFlags(t)
p := BasicPipeline(*integration.IoExpansionAddr, bootstrapAddr)
ptest.RunAndValidate(t, p)
}

// TestMain starts up a Kafka cluster from integration.KafkaJar before running
// tests through ptest.Main.
func TestMain(m *testing.M) {
// Defer os.Exit so it happens after other defers.
var retCode int
defer func() { os.Exit(retCode) }()

// Start local Kafka cluster and defer its shutdown.
if *integration.BootstrapServers != "" {
bootstrapAddr = *integration.BootstrapServers
} else if *integration.KafkaJar != "" {
cluster, err := runLocalKafka(*integration.KafkaJar)
if err != nil {
log.Fatalf("Kafka cluster failed to start: %v", err)
}
defer func() { cluster.proc.Kill() }()
bootstrapAddr = cluster.bootstrapAddr
}

retCode = ptest.MainRet(m)
}
Loading

0 comments on commit b86fcf9

Please sign in to comment.