Skip to content

Commit

Permalink
Refactor integration tests to use internal containers package (#24823)
Browse files Browse the repository at this point in the history
  • Loading branch information
johannaojeling authored Dec 29, 2022
1 parent c5c5b35 commit 80de3ef
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 75 deletions.
45 changes: 42 additions & 3 deletions sdks/go/test/integration/internal/containers/containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,45 @@ package containers
import (
"context"
"testing"
"time"

"github.com/docker/go-connections/nat"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
"gopkg.in/retry.v1"
)

type ContainerOptionFn func(*testcontainers.ContainerRequest)

func WithEnv(env map[string]string) ContainerOptionFn {
return func(option *testcontainers.ContainerRequest) {
option.Env = env
}
}

func WithHostname(hostname string) ContainerOptionFn {
return func(option *testcontainers.ContainerRequest) {
option.Hostname = hostname
}
}

func WithPorts(ports []string) ContainerOptionFn {
return func(option *testcontainers.ContainerRequest) {
option.ExposedPorts = ports
}
}

func WithWaitStrategy(waitStrategy wait.Strategy) ContainerOptionFn {
return func(option *testcontainers.ContainerRequest) {
option.WaitingFor = waitStrategy
}
}

func NewContainer(
ctx context.Context,
t *testing.T,
image string,
maxRetries int,
opts ...ContainerOptionFn,
) testcontainers.Container {
t.Helper()
Expand All @@ -51,9 +73,26 @@ func NewContainer(
Started: true,
}

container, err := testcontainers.GenericContainer(ctx, genericRequest)
if err != nil {
t.Fatalf("error creating container: %v", err)
strategy := retry.LimitCount(
maxRetries,
retry.Exponential{
Initial: time.Second,
Factor: 2,
},
)

var container testcontainers.Container
var err error

for attempt := retry.Start(strategy, nil); attempt.Next(); {
container, err = testcontainers.GenericContainer(ctx, genericRequest)
if err == nil {
break
}

if attempt.Count() == maxRetries {
t.Fatalf("failed to start container with %v retries: %v", maxRetries, err)
}
}

t.Cleanup(func() {
Expand Down
6 changes: 4 additions & 2 deletions sdks/go/test/integration/io/mongodbio/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import (

const (
mongoImage = "mongo:6.0.3"
mongoPort = "27017"
mongoPort = "27017/tcp"
maxRetries = 5
)

func setUpTestContainer(ctx context.Context, t *testing.T) string {
Expand All @@ -39,7 +40,8 @@ func setUpTestContainer(ctx context.Context, t *testing.T) string {
ctx,
t,
mongoImage,
containers.WithPorts([]string{mongoPort + "/tcp"}),
maxRetries,
containers.WithPorts([]string{mongoPort}),
)

return containers.Port(ctx, t, container, mongoPort)
Expand Down
44 changes: 20 additions & 24 deletions sdks/go/test/integration/io/xlang/debezium/debezium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,14 @@ import (
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/spark"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
"github.com/apache/beam/sdks/v2/go/test/integration"
"github.com/docker/go-connections/nat"
"github.com/apache/beam/sdks/v2/go/test/integration/internal/containers"
_ "github.com/lib/pq"
"github.com/testcontainers/testcontainers-go"
)

const (
debeziumImage = "debezium/example-postgres:latest"
debeziumPort = "5432/tcp"
maxRetries = 5
)

var expansionAddr string // Populate with expansion address labelled "debeziumio".
Expand All @@ -42,46 +47,37 @@ func checkFlags(t *testing.T) {
}
}

func setupTestContainer(t *testing.T, dbname, username, password string) string {
func setupTestContainer(ctx context.Context, t *testing.T, dbname, username, password string) string {
t.Helper()

var env = map[string]string{
env := map[string]string{
"POSTGRES_PASSWORD": password,
"POSTGRES_USER": username,
"POSTGRES_DB": dbname,
}
var port = "5432/tcp"

req := testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "debezium/example-postgres:latest",
ExposedPorts: []string{port},
Env: env,
},
Started: true,
}
ctx := context.Background()
container, err := testcontainers.GenericContainer(ctx, req)
if err != nil {
t.Fatalf("failed to start container: %v", err)
}
container := containers.NewContainer(
ctx,
t,
debeziumImage,
maxRetries,
containers.WithEnv(env),
containers.WithPorts([]string{debeziumPort}),
)

mappedPort, err := container.MappedPort(ctx, nat.Port(port))
if err != nil {
t.Fatalf("failed to get container external port: %v", err)
}
return mappedPort.Port()
return containers.Port(ctx, t, container, debeziumPort)
}

// TestDebeziumIO_BasicRead tests basic read transform from Debezium.
func TestDebeziumIO_BasicRead(t *testing.T) {
integration.CheckFilters(t)
checkFlags(t)

ctx := context.Background()
dbname := "inventory"
username := "debezium"
password := "dbz"
port := setupTestContainer(t, dbname, username, password)
port := setupTestContainer(ctx, t, dbname, username, password)
host := "localhost"
connectionProperties := []string{
"database.dbname=inventory",
Expand Down
75 changes: 29 additions & 46 deletions sdks/go/test/integration/io/xlang/jdbc/jdbc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,72 +30,56 @@ import (
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/spark"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
"github.com/apache/beam/sdks/v2/go/test/integration"
"github.com/apache/beam/sdks/v2/go/test/integration/internal/containers"
"github.com/docker/go-connections/nat"
_ "github.com/go-sql-driver/mysql"
_ "github.com/lib/pq"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
"gopkg.in/retry.v1"
)

const (
postgresImage = "postgres"
postgresPort = "5432/tcp"
maxRetries = 5
)

var expansionAddr string // Populate with expansion address labelled "schemaio".
const maxRetryCount = 5

func checkFlags(t *testing.T) {
if expansionAddr == "" {
t.Skip("No Schema IO expansion address provided.")
}
}

func setupTestContainer(t *testing.T, ctx context.Context, dbname, username, password string) (testcontainers.Container, int) {
func setupTestContainer(ctx context.Context, t *testing.T, dbname, username, password string) string {
t.Helper()

var env = map[string]string{
env := map[string]string{
"POSTGRES_PASSWORD": password,
"POSTGRES_USER": username,
"POSTGRES_DB": dbname,
}
hostname := "localhost"

var port = "5432/tcp"
dbURL := func(host string, port nat.Port) string {
return fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable", username, password, host, port.Port(), dbname)
}

req := testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "postgres",
ExposedPorts: []string{port},
Env: env,
Hostname: "localhost",
WaitingFor: wait.ForSQL(nat.Port(port), "postgres", dbURL).Timeout(time.Second * 5),
},
Started: true,
}

strategy := retry.LimitCount(maxRetryCount,
retry.Exponential{
Initial: time.Second,
Factor: 2,
},
waitStrategy := wait.ForSQL(postgresPort, "postgres", dbURL).WithStartupTimeout(time.Second * 5)

container := containers.NewContainer(
ctx,
t,
postgresImage,
maxRetries,
containers.WithPorts([]string{postgresPort}),
containers.WithEnv(env),
containers.WithHostname(hostname),
containers.WithWaitStrategy(waitStrategy),
)
var container testcontainers.Container
var err error
for r := retry.Start(strategy, nil); r.Next(); {
container, err = testcontainers.GenericContainer(ctx, req)
if err == nil {
break
}
if r.Count() == maxRetryCount {
t.Fatalf("failed to start container with %v retries: %v", maxRetryCount, err)
}
}

mappedPort, err := container.MappedPort(ctx, nat.Port(port))
if err != nil {
t.Fatalf("failed to get container external port: %s", err)
}
mappedPort := containers.Port(ctx, t, container, postgresPort)

url := fmt.Sprintf("postgres://%s:%s@localhost:%s/%s?sslmode=disable", username, password, mappedPort.Port(), dbname)
url := fmt.Sprintf("postgres://%s:%s@localhost:%s/%s?sslmode=disable", username, password, mappedPort, dbname)
db, err := sql.Open("postgres", url)
if err != nil {
t.Fatalf("failed to establish database connection: %s", err)
Expand All @@ -106,7 +90,8 @@ func setupTestContainer(t *testing.T, ctx context.Context, dbname, username, pas
if err != nil {
t.Fatalf("can't create table, check command and access level")
}
return container, mappedPort.Int()

return mappedPort
}

// TestJDBCIO_BasicReadWrite tests basic read and write transform from JDBC.
Expand All @@ -119,11 +104,10 @@ func TestJDBCIO_BasicReadWrite(t *testing.T) {
username := "newuser"
password := "password"

cont, port := setupTestContainer(t, ctx, dbname, username, password)
defer cont.Terminate(ctx)
port := setupTestContainer(ctx, t, dbname, username, password)
tableName := "roles"
host := "localhost"
jdbcUrl := fmt.Sprintf("jdbc:postgresql://%s:%d/%s", host, port, dbname)
jdbcUrl := fmt.Sprintf("jdbc:postgresql://%s:%s/%s", host, port, dbname)

write := WritePipeline(expansionAddr, tableName, "org.postgresql.Driver", jdbcUrl, username, password)
ptest.RunAndValidate(t, write)
Expand All @@ -141,11 +125,10 @@ func TestJDBCIO_PostgresReadWrite(t *testing.T) {
username := "newuser"
password := "password"
ctx := context.Background()
cont, port := setupTestContainer(t, ctx, dbname, username, password)
defer cont.Terminate(ctx)
port := setupTestContainer(ctx, t, dbname, username, password)
tableName := "roles"
host := "localhost"
jdbcUrl := fmt.Sprintf("jdbc:postgresql://%s:%d/%s", host, port, dbname)
jdbcUrl := fmt.Sprintf("jdbc:postgresql://%s:%s/%s", host, port, dbname)

write := WriteToPostgres(expansionAddr, tableName, jdbcUrl, username, password)
ptest.RunAndValidate(t, write)
Expand Down

0 comments on commit 80de3ef

Please sign in to comment.