Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Go SDK] Refactor integration tests to use internal containers package for test containers #24823

Merged
merged 1 commit into from
Dec 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 42 additions & 3 deletions sdks/go/test/integration/internal/containers/containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,45 @@ package containers
import (
"context"
"testing"
"time"

"github.com/docker/go-connections/nat"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
"gopkg.in/retry.v1"
)

type ContainerOptionFn func(*testcontainers.ContainerRequest)

func WithEnv(env map[string]string) ContainerOptionFn {
return func(option *testcontainers.ContainerRequest) {
option.Env = env
}
}

func WithHostname(hostname string) ContainerOptionFn {
return func(option *testcontainers.ContainerRequest) {
option.Hostname = hostname
}
}

func WithPorts(ports []string) ContainerOptionFn {
return func(option *testcontainers.ContainerRequest) {
option.ExposedPorts = ports
}
}

func WithWaitStrategy(waitStrategy wait.Strategy) ContainerOptionFn {
return func(option *testcontainers.ContainerRequest) {
option.WaitingFor = waitStrategy
}
}

func NewContainer(
ctx context.Context,
t *testing.T,
image string,
maxRetries int,
opts ...ContainerOptionFn,
) testcontainers.Container {
t.Helper()
Expand All @@ -51,9 +73,26 @@ func NewContainer(
Started: true,
}

container, err := testcontainers.GenericContainer(ctx, genericRequest)
if err != nil {
t.Fatalf("error creating container: %v", err)
strategy := retry.LimitCount(
maxRetries,
retry.Exponential{
Initial: time.Second,
Factor: 2,
},
)

var container testcontainers.Container
var err error

for attempt := retry.Start(strategy, nil); attempt.Next(); {
container, err = testcontainers.GenericContainer(ctx, genericRequest)
if err == nil {
break
}

if attempt.Count() == maxRetries {
t.Fatalf("failed to start container with %v retries: %v", maxRetries, err)
}
}

t.Cleanup(func() {
Expand Down
6 changes: 4 additions & 2 deletions sdks/go/test/integration/io/mongodbio/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import (

const (
mongoImage = "mongo:6.0.3"
mongoPort = "27017"
mongoPort = "27017/tcp"
maxRetries = 5
)

func setUpTestContainer(ctx context.Context, t *testing.T) string {
Expand All @@ -39,7 +40,8 @@ func setUpTestContainer(ctx context.Context, t *testing.T) string {
ctx,
t,
mongoImage,
containers.WithPorts([]string{mongoPort + "/tcp"}),
maxRetries,
containers.WithPorts([]string{mongoPort}),
)

return containers.Port(ctx, t, container, mongoPort)
Expand Down
44 changes: 20 additions & 24 deletions sdks/go/test/integration/io/xlang/debezium/debezium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,14 @@ import (
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/spark"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
"github.com/apache/beam/sdks/v2/go/test/integration"
"github.com/docker/go-connections/nat"
"github.com/apache/beam/sdks/v2/go/test/integration/internal/containers"
_ "github.com/lib/pq"
"github.com/testcontainers/testcontainers-go"
)

const (
debeziumImage = "debezium/example-postgres:latest"
debeziumPort = "5432/tcp"
maxRetries = 5
)

var expansionAddr string // Populate with expansion address labelled "debeziumio".
Expand All @@ -42,46 +47,37 @@ func checkFlags(t *testing.T) {
}
}

func setupTestContainer(t *testing.T, dbname, username, password string) string {
func setupTestContainer(ctx context.Context, t *testing.T, dbname, username, password string) string {
t.Helper()

var env = map[string]string{
env := map[string]string{
"POSTGRES_PASSWORD": password,
"POSTGRES_USER": username,
"POSTGRES_DB": dbname,
}
var port = "5432/tcp"

req := testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "debezium/example-postgres:latest",
ExposedPorts: []string{port},
Env: env,
},
Started: true,
}
ctx := context.Background()
container, err := testcontainers.GenericContainer(ctx, req)
if err != nil {
t.Fatalf("failed to start container: %v", err)
}
container := containers.NewContainer(
ctx,
t,
debeziumImage,
maxRetries,
containers.WithEnv(env),
containers.WithPorts([]string{debeziumPort}),
)

mappedPort, err := container.MappedPort(ctx, nat.Port(port))
if err != nil {
t.Fatalf("failed to get container external port: %v", err)
}
return mappedPort.Port()
return containers.Port(ctx, t, container, debeziumPort)
}

// TestDebeziumIO_BasicRead tests basic read transform from Debezium.
func TestDebeziumIO_BasicRead(t *testing.T) {
integration.CheckFilters(t)
checkFlags(t)

ctx := context.Background()
dbname := "inventory"
username := "debezium"
password := "dbz"
port := setupTestContainer(t, dbname, username, password)
port := setupTestContainer(ctx, t, dbname, username, password)
host := "localhost"
connectionProperties := []string{
"database.dbname=inventory",
Expand Down
75 changes: 29 additions & 46 deletions sdks/go/test/integration/io/xlang/jdbc/jdbc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,72 +30,56 @@ import (
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/spark"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
"github.com/apache/beam/sdks/v2/go/test/integration"
"github.com/apache/beam/sdks/v2/go/test/integration/internal/containers"
"github.com/docker/go-connections/nat"
_ "github.com/go-sql-driver/mysql"
_ "github.com/lib/pq"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
"gopkg.in/retry.v1"
)

const (
postgresImage = "postgres"
postgresPort = "5432/tcp"
maxRetries = 5
)

var expansionAddr string // Populate with expansion address labelled "schemaio".
const maxRetryCount = 5

func checkFlags(t *testing.T) {
if expansionAddr == "" {
t.Skip("No Schema IO expansion address provided.")
}
}

func setupTestContainer(t *testing.T, ctx context.Context, dbname, username, password string) (testcontainers.Container, int) {
func setupTestContainer(ctx context.Context, t *testing.T, dbname, username, password string) string {
t.Helper()

var env = map[string]string{
env := map[string]string{
"POSTGRES_PASSWORD": password,
"POSTGRES_USER": username,
"POSTGRES_DB": dbname,
}
hostname := "localhost"

var port = "5432/tcp"
dbURL := func(host string, port nat.Port) string {
return fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable", username, password, host, port.Port(), dbname)
}

req := testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "postgres",
ExposedPorts: []string{port},
Env: env,
Hostname: "localhost",
WaitingFor: wait.ForSQL(nat.Port(port), "postgres", dbURL).Timeout(time.Second * 5),
},
Started: true,
}

strategy := retry.LimitCount(maxRetryCount,
retry.Exponential{
Initial: time.Second,
Factor: 2,
},
waitStrategy := wait.ForSQL(postgresPort, "postgres", dbURL).WithStartupTimeout(time.Second * 5)

container := containers.NewContainer(
ctx,
t,
postgresImage,
maxRetries,
containers.WithPorts([]string{postgresPort}),
containers.WithEnv(env),
containers.WithHostname(hostname),
containers.WithWaitStrategy(waitStrategy),
)
var container testcontainers.Container
var err error
for r := retry.Start(strategy, nil); r.Next(); {
container, err = testcontainers.GenericContainer(ctx, req)
if err == nil {
break
}
if r.Count() == maxRetryCount {
t.Fatalf("failed to start container with %v retries: %v", maxRetryCount, err)
}
}

mappedPort, err := container.MappedPort(ctx, nat.Port(port))
if err != nil {
t.Fatalf("failed to get container external port: %s", err)
}
mappedPort := containers.Port(ctx, t, container, postgresPort)

url := fmt.Sprintf("postgres://%s:%s@localhost:%s/%s?sslmode=disable", username, password, mappedPort.Port(), dbname)
url := fmt.Sprintf("postgres://%s:%s@localhost:%s/%s?sslmode=disable", username, password, mappedPort, dbname)
db, err := sql.Open("postgres", url)
if err != nil {
t.Fatalf("failed to establish database connection: %s", err)
Expand All @@ -106,7 +90,8 @@ func setupTestContainer(t *testing.T, ctx context.Context, dbname, username, pas
if err != nil {
t.Fatalf("can't create table, check command and access level")
}
return container, mappedPort.Int()

return mappedPort
}

// TestJDBCIO_BasicReadWrite tests basic read and write transform from JDBC.
Expand All @@ -119,11 +104,10 @@ func TestJDBCIO_BasicReadWrite(t *testing.T) {
username := "newuser"
password := "password"

cont, port := setupTestContainer(t, ctx, dbname, username, password)
defer cont.Terminate(ctx)
port := setupTestContainer(ctx, t, dbname, username, password)
tableName := "roles"
host := "localhost"
jdbcUrl := fmt.Sprintf("jdbc:postgresql://%s:%d/%s", host, port, dbname)
jdbcUrl := fmt.Sprintf("jdbc:postgresql://%s:%s/%s", host, port, dbname)

write := WritePipeline(expansionAddr, tableName, "org.postgresql.Driver", jdbcUrl, username, password)
ptest.RunAndValidate(t, write)
Expand All @@ -141,11 +125,10 @@ func TestJDBCIO_PostgresReadWrite(t *testing.T) {
username := "newuser"
password := "password"
ctx := context.Background()
cont, port := setupTestContainer(t, ctx, dbname, username, password)
defer cont.Terminate(ctx)
port := setupTestContainer(ctx, t, dbname, username, password)
tableName := "roles"
host := "localhost"
jdbcUrl := fmt.Sprintf("jdbc:postgresql://%s:%d/%s", host, port, dbname)
jdbcUrl := fmt.Sprintf("jdbc:postgresql://%s:%s/%s", host, port, dbname)

write := WriteToPostgres(expansionAddr, tableName, jdbcUrl, username, password)
ptest.RunAndValidate(t, write)
Expand Down