Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

roachtest: expand and cloudify cdc/initial-scan-rolling-restart #123924

Merged
merged 1 commit into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ go_library(
"//pkg/util/uuid",
"//pkg/util/version",
"//pkg/workload",
"//pkg/workload/debug",
"//pkg/workload/histogram",
"//pkg/workload/querybench",
"//pkg/workload/tpcc",
Expand Down
96 changes: 72 additions & 24 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload/debug"
"github.com/cockroachdb/errors"
"golang.org/x/oauth2/clientcredentials"
)
Expand Down Expand Up @@ -908,11 +909,23 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) {
m.Wait()
}

func runCDCBackfillRollingRestart(ctx context.Context, t test.Test, c cluster.Cluster) {
const rowCount, splitCount = 1000000, 500
type cdcCheckpointType int

const (
cdcNormalCheckpoint cdcCheckpointType = iota
cdcShutdownCheckpoint
)

// runCDCInitialScanRollingRestart runs multiple initial-scan-only changefeeds
// on a 4-node cluster, using node 1 as the coordinator and continuously
// restarting nodes 2-4 to hopefully force the changefeed to replan and exercise
// the checkpoint restore logic.
func runCDCInitialScanRollingRestart(
ctx context.Context, t test.Test, c cluster.Cluster, checkpointType cdcCheckpointType,
) {
startOpts := option.DefaultStartOpts()
ips, err := c.InternalIP(ctx, t.L(), c.Node(1))
sinkURL := fmt.Sprintf("https://%s:9707", ips[0])
ips, err := c.ExternalIP(ctx, t.L(), c.Node(1))
sinkURL := fmt.Sprintf("https://%s:%d", ips[0], debug.WebhookServerPort)
sink := &http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}}
if err != nil {
t.Fatal(err)
Expand All @@ -922,7 +935,7 @@ func runCDCBackfillRollingRestart(ctx context.Context, t test.Test, c cluster.Cl
racks := install.MakeClusterSettings(install.NumRacksOption(c.Spec().NodeCount))
racks.Env = append(racks.Env, `COCKROACH_CHANGEFEED_TESTING_FAST_RETRY=true`)
c.Start(ctx, t.L(), option.DefaultStartOpts(), racks)
m := c.NewMonitor(ctx, c.Range(1, 5))
m := c.NewMonitor(ctx, c.All())

restart := func(n int) error {
cmd := fmt.Sprintf("./cockroach node drain --certs-dir=%s --port={pgport:%d} --self", install.CockroachNodeCertsDir, n)
Expand All @@ -945,21 +958,42 @@ func runCDCBackfillRollingRestart(ctx context.Context, t test.Test, c cluster.Cl

db := c.Conn(ctx, t.L(), 1)

// Setup a 1M row table that is split into >= 500 scattered ranges.
// Setup a large table with 1M rows and a small table with 5 rows.
// Keep ranges off n1 so that our plans use 2, 3, and 4.
const (
largeRowCount = 1000000
smallRowCount = 5
)
t.L().Printf("setting up test data...")
for _, s := range []string{
setupStmts := []string{
`ALTER RANGE default CONFIGURE ZONE USING constraints = '[-rack=0]'`,
fmt.Sprintf(`CREATE TABLE t (id PRIMARY KEY) AS SELECT generate_series(1, %d) id`, rowCount),
`ALTER TABLE t SCATTER`,
// Split some bigger chunks up to scatter it a bit more.
fmt.Sprintf(`ALTER TABLE t SPLIT AT SELECT id FROM t ORDER BY random() LIMIT %d`, splitCount/4),
`ALTER TABLE t SCATTER`,
// Finish splitting, so that drained ranges spread out evenly.
fmt.Sprintf(`ALTER TABLE t SPLIT AT SELECT id FROM t ORDER BY random() LIMIT %d`, splitCount),
fmt.Sprintf(`CREATE TABLE large (id PRIMARY KEY) AS SELECT generate_series(1, %d) id`, largeRowCount),
`ALTER TABLE large SCATTER`,
fmt.Sprintf(`CREATE TABLE small (id PRIMARY KEY) AS SELECT generate_series(%d, %d)`, largeRowCount+1, largeRowCount+smallRowCount),
`ALTER TABLE small SCATTER`,
`SET CLUSTER SETTING jobs.registry.retry.initial_delay = '.1s'`,
`SET CLUSTER SETTING jobs.registry.retry.max_delay = '.4s'`,
} {
}
switch checkpointType {
case cdcNormalCheckpoint:
setupStmts = append(setupStmts,
`SET CLUSTER SETTING changefeed.frontier_checkpoint_frequency = '1s'`,
`SET CLUSTER SETTING changefeed.shutdown_checkpoint.enabled = 'false'`,
)
case cdcShutdownCheckpoint:
const largeSplitCount = 5
setupStmts = append(setupStmts,
`SET CLUSTER SETTING changefeed.frontier_checkpoint_frequency = '0'`,
`SET CLUSTER SETTING changefeed.shutdown_checkpoint.enabled = 'true'`,
// Split some bigger chunks up to scatter it a bit more.
fmt.Sprintf(`ALTER TABLE large SPLIT AT SELECT id FROM large ORDER BY random() LIMIT %d`, largeSplitCount/4),
`ALTER TABLE large SCATTER`,
// Finish splitting, so that drained ranges spread out evenly.
fmt.Sprintf(`ALTER TABLE large SPLIT AT SELECT id FROM large ORDER BY random() LIMIT %d`, largeSplitCount),
`ALTER TABLE large SCATTER`,
)
}
for _, s := range setupStmts {
t.L().Printf(s)
if _, err := db.Exec(s); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -988,7 +1022,7 @@ func runCDCBackfillRollingRestart(ctx context.Context, t test.Test, c cluster.Cl
}()
t.L().Printf("starting rolling drain+restarts of 2, 3, 4...")
for {
for _, n := range []int{2, 3, 4, 5} {
for _, n := range []int{2, 3, 4} {
select {
case <-stopRestarts:
return nil
Expand All @@ -1015,11 +1049,12 @@ func runCDCBackfillRollingRestart(ctx context.Context, t test.Test, c cluster.Cl
t.L().Printf("exiting webhook sink status: %v", err)
}()

for i := 1; i < 5; i++ {
const numChangefeeds = 5
for i := 1; i < numChangefeeds; i++ {
t.L().Printf("starting changefeed...")
var job int
if err := db.QueryRow(
fmt.Sprintf("CREATE CHANGEFEED FOR TABLE t INTO 'webhook-%s/?insecure_tls_skip_verify=true' WITH initial_scan='only'", sinkURL),
fmt.Sprintf("CREATE CHANGEFEED FOR TABLE large, small INTO 'webhook-%s/?insecure_tls_skip_verify=true' WITH initial_scan='only'", sinkURL),
).Scan(&job); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1054,8 +1089,9 @@ func runCDCBackfillRollingRestart(ctx context.Context, t test.Test, c cluster.Cl
t.Fatal(err)
}
t.L().Printf("sink got %d unique, %d dupes", unique, dupes)
if unique != rowCount {
t.Fatalf("expected %d, got %d", rowCount, unique)
expected := largeRowCount + smallRowCount
if unique != expected {
t.Fatalf("expected %d, got %d", expected, unique)
}
_, err = sink.Get(sinkURL + "/reset")
t.L().Printf("resetting sink %v", err)
Expand Down Expand Up @@ -1286,15 +1322,27 @@ func registerCDC(r registry.Registry) {
},
})
r.Add(registry.TestSpec{
Name: "cdc/initial-scan-rolling-restart",
Name: "cdc/initial-scan-rolling-restart/normal-checkpoint",
Owner: registry.OwnerCDC,
Cluster: r.MakeClusterSpec(4),
RequiresLicense: true,
CompatibleClouds: registry.OnlyGCE,
Suites: registry.Suites(registry.Nightly),
Timeout: time.Minute * 15,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runCDCInitialScanRollingRestart(ctx, t, c, cdcNormalCheckpoint)
},
})
r.Add(registry.TestSpec{
Name: "cdc/initial-scan-rolling-restart/shutdown-checkpoint",
Owner: registry.OwnerCDC,
Cluster: r.MakeClusterSpec(5),
Cluster: r.MakeClusterSpec(4),
RequiresLicense: true,
CompatibleClouds: registry.OnlyLocal,
CompatibleClouds: registry.OnlyGCE,
Suites: registry.Suites(registry.Nightly),
Timeout: time.Minute * 15,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runCDCBackfillRollingRestart(ctx, t, c)
runCDCInitialScanRollingRestart(ctx, t, c, cdcShutdownCheckpoint)
},
})
r.Add(registry.TestSpec{
Expand Down
6 changes: 3 additions & 3 deletions pkg/workload/debug/webhook_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var webhookServerCmd = &cobra.Command{
}

const (
port = 9707
WebhookServerPort = 9707
)

func webhookServer(cmd *cobra.Command, args []string) error {
Expand Down Expand Up @@ -124,11 +124,11 @@ func webhookServer(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
log.Printf("starting server on port %d", port)
log.Printf("starting server on port %d", WebhookServerPort)
return (&http.Server{
TLSConfig: &tls.Config{Certificates: []tls.Certificate{cert}},
Handler: mux,
Addr: fmt.Sprintf(":%d", port),
Addr: fmt.Sprintf(":%d", WebhookServerPort),
}).ListenAndServeTLS("", "")
}

Expand Down
Loading