diff --git a/pkg/acceptance/cluster/dockercluster.go b/pkg/acceptance/cluster/dockercluster.go index fe72120dc9ae..afadf54f33bb 100644 --- a/pkg/acceptance/cluster/dockercluster.go +++ b/pkg/acceptance/cluster/dockercluster.go @@ -200,6 +200,11 @@ func CreateDocker( } } +// Client returns the underlying docker client. +func (l *DockerCluster) Client() client.APIClient { + return l.client +} + func (l *DockerCluster) expectEvent(c *Container, msgs ...string) { for index, ctr := range l.Nodes { if c.id != ctr.id { diff --git a/pkg/ccl/acceptanceccl/cdc_kafka_test.go b/pkg/ccl/acceptanceccl/cdc_kafka_test.go index 689c2bd81149..6b71b80d8cdf 100644 --- a/pkg/ccl/acceptanceccl/cdc_kafka_test.go +++ b/pkg/ccl/acceptanceccl/cdc_kafka_test.go @@ -12,6 +12,7 @@ import ( "context" "encoding/json" "fmt" + "io/ioutil" "math/rand" "net" "reflect" @@ -36,17 +37,19 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/cockroach/pkg/workload/bank" + "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/docker/go-connections/nat" ) func TestCDC(t *testing.T) { - t.Skip("#28102") s := log.Scope(t) defer s.Close(t) acceptance.RunDocker(t, func(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cfg := acceptance.ReadConfigFromFlags() // Should we thread the old value of cfg.Nodes to the TestCluster? cfg.Nodes = nil @@ -63,6 +66,30 @@ func TestCDC(t *testing.T) { t.Fatalf(`%+v`, err) } defer k.Close(ctx) + go func() { + select { + case <-ctx.Done(): + return + case <-time.After(time.Minute): + } + // Additional debugging in case #28102 shows up again. + logs, err := c.Client().ContainerLogs( + context.Background(), + k.serviceContainers[`kafka`].Name(), + types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true}, + ) + if err != nil { + log.Warningf(ctx, "unable to get additional debugging: %v", err) + return + } + defer logs.Close() + logsBytes, err := ioutil.ReadAll(logs) + if err != nil { + log.Warningf(ctx, "unable to get additional debugging: %v", err) + return + } + log.Infof(ctx, "KAFKA DOCKER CONTAINER LOGS:\n%s", string(logsBytes)) + }() t.Run(`Description`, func(t *testing.T) { testDescription(ctx, t, c, k) }) t.Run(`PauseUnpause`, func(t *testing.T) { testPauseUnpause(ctx, t, c, k) })