Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
95518: roachtest/awsdms: add no pk full load test case r=Jeremyyang920 a=Jeremyyang920

This commit adds a new test case that will attempt a full load on a table with no primary key. We will
assert that there is a table error with the full
load.

Fixes: #95328

Release note: None

95526: multitenant: append `WITH REPLICATION STATUS` columns to `SHOW TENANT` columns r=knz a=ecwall

Informs #87851

`WITH CAPABILITIES` will be adding another set of columns so append relevant columns instead of creating all possible column sets ahead of time.

Release note: None

95536: grunning: add grunning.Elapsed() r=irfansharif a=irfansharif

Elapsed returns the running time spent doing some piece of work, with grunning.Time() measurements from the start and end. This only exists due to grunning.Time()'s non-monotonicity, a bug in our runtime patch: #95529. We can get rid of this, keeping just grunning.Difference(), if that bug is fixed. The bug results in slight {over,under}-estimation of the running time (the latter breaking monotonicity), but is livable with our current uses of this library.

Release note: None

Co-authored-by: Jeremy Yang <[email protected]>
Co-authored-by: Evan Wall <[email protected]>
Co-authored-by: irfan sharif <[email protected]>
  • Loading branch information
4 people committed Jan 19, 2023
4 parents 9c9ec12 + 1d59169 + f0b06e4 + 85046a4 commit ba572fc
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 139 deletions.
231 changes: 151 additions & 80 deletions pkg/cmd/roachtest/tests/awsdms.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package tests

import (
"context"
gosql "database/sql"
"fmt"
"math/rand"
"os"
Expand Down Expand Up @@ -46,6 +47,37 @@ const (
awsdmsNumInitialRows = 100000
)

type dmsTask struct {
tableName string
tableMappings string
replicationSettings *string
}

const tableRules = `{
"rules": [
{
"rule-type": "selection",
"rule-id": "1",
"rule-name": "1",
"object-locator": {
"schema-name": "%%",
"table-name": "%s"
},
"rule-action": "include"
}
]
}`

var dmsTasks = []dmsTask{
{"test_table", tableRules, nil},
{"test_table_no_pk", tableRules, proto.String(`{
"FullLoadSettings":{
"TargetTablePrepMode":"TRUNCATE_BEFORE_LOAD"
}
}`),
},
}

func awsdmsVerString(v *version.Version) string {
if ciBranch := os.Getenv("TC_BUILD_BRANCH"); ciBranch != "" {
ciBranch = strings.ReplaceAll(ciBranch, ".", "-")
Expand All @@ -66,8 +98,8 @@ func awsdmsRoachtestDMSParameterGroup(v *version.Version) string {
return "roachtest-awsdms-param-group-" + awsdmsVerString(v)
}

func awsdmsRoachtestDMSTaskName(v *version.Version) string {
return "roachtest-awsdms-dms-task-" + awsdmsVerString(v)
func awsdmsRoachtestDMSTaskName(v *version.Version, tableName string) string {
return fmt.Sprintf("roachtest-awsdms-dms-task-%s-%s", strings.ReplaceAll(tableName, "_", "-"), awsdmsVerString(v))
}

func awsdmsRoachtestDMSReplicationInstanceName(v *version.Version) string {
Expand Down Expand Up @@ -107,12 +139,14 @@ func dmsDescribeInstancesInput(v *version.Version) *dms.DescribeReplicationInsta
}
}

func dmsDescribeTasksInput(v *version.Version) *dms.DescribeReplicationTasksInput {
func dmsDescribeTasksInput(
v *version.Version, tableName string,
) *dms.DescribeReplicationTasksInput {
return &dms.DescribeReplicationTasksInput{
Filters: []dmstypes.Filter{
{
Name: proto.String("replication-task-id"),
Values: []string{awsdmsRoachtestDMSTaskName(v)},
Values: []string{awsdmsRoachtestDMSTaskName(v, tableName)},
},
},
}
Expand Down Expand Up @@ -175,6 +209,14 @@ func runAWSDMS(ctx context.Context, t test.Test, c cluster.Cluster) {
}
targetPGConn := c.Conn(ctx, t.L(), 1)

checkDMSReplicated(ctx, t, sourcePGConn, targetPGConn)
checkDMSNoPKTableError(ctx, t, dmsCli)
t.L().Printf("testing complete")
}

func checkDMSReplicated(
ctx context.Context, t test.Test, sourcePGConn *pgx.Conn, targetPGConn *gosql.DB,
) {
waitForReplicationRetryOpts := retry.Options{
MaxBackoff: time.Second,
MaxRetries: 90,
Expand Down Expand Up @@ -267,8 +309,40 @@ func runAWSDMS(ctx context.Context, t test.Test, c cluster.Cluster) {
}(); err != nil {
t.Fatal(err)
}
}

t.L().Printf("testing complete")
func checkDMSNoPKTableError(ctx context.Context, t test.Test, dmsCli *dms.Client) {
waitForTableError := retry.Options{
MaxBackoff: time.Second,
MaxRetries: 90,
}
t.L().Printf("testing no pk table has a table error")
if err := func() error {
for r := retry.StartWithCtx(ctx, waitForTableError); r.Next(); {
err := func() error {
dmsTasks, err := dmsCli.DescribeReplicationTasks(ctx, dmsDescribeTasksInput(t.BuildVersion(), "test_table_no_pk"))
if err != nil {
if !isDMSResourceNotFound(err) {
return err
}
}
for _, task := range dmsTasks.ReplicationTasks {
if task.ReplicationTaskStats.TablesErrored == 1 {
t.L().Printf("table error was found")
return nil
}
}
return errors.New("no table error found yet")
}()
if err == nil {
return nil
}
t.L().Printf("table error not found, retrying: %+v", err)
}
return errors.Newf("failed to find table error")
}(); err != nil {
t.Fatal(err)
}
}

// setupAWSDMS sets up an RDS instance and a DMS instance which sets up a
Expand Down Expand Up @@ -452,6 +526,11 @@ func setupRDSCluster(
`INSERT INTO test_table(id, t) SELECT i, md5(random()::text) FROM generate_series(1, %d) AS t(i)`,
awsdmsNumInitialRows,
),
`CREATE TABLE test_table_no_pk(id integer, t TEXT)`,
fmt.Sprintf(
`INSERT INTO test_table_no_pk(id, t) SELECT i, md5(random()::text) FROM generate_series(1, %d) AS t(i)`,
awsdmsNumInitialRows,
),
} {
if _, err := pgConn.Exec(
ctx,
Expand Down Expand Up @@ -579,56 +658,46 @@ func setupDMSEndpointsAndTask(
}
}

t.L().Printf("creating replication task")
replTaskOut, err := dmsCli.CreateReplicationTask(
ctx,
&dms.CreateReplicationTaskInput{
MigrationType: dmstypes.MigrationTypeValueFullLoadAndCdc,
ReplicationInstanceArn: proto.String(replicationARN),
ReplicationTaskIdentifier: proto.String(awsdmsRoachtestDMSTaskName(t.BuildVersion())),
SourceEndpointArn: proto.String(sourceARN),
TargetEndpointArn: proto.String(targetARN),
// TODO(#migrations): when AWS API supports EnableValidation, add it here.
TableMappings: proto.String(`{
"rules": [
{
"rule-type": "selection",
"rule-id": "1",
"rule-name": "1",
"object-locator": {
"schema-name": "%",
"table-name": "%"
},
"rule-action": "include"
}
]
}`),
},
)
if err != nil {
return err
}
t.L().Printf("waiting for replication task to be ready")
if err := dms.NewReplicationTaskReadyWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion()), awsdmsWaitTimeLimit); err != nil {
return err
}
t.L().Printf("starting replication task")
if _, err := dmsCli.StartReplicationTask(
ctx,
&dms.StartReplicationTaskInput{
ReplicationTaskArn: replTaskOut.ReplicationTask.ReplicationTaskArn,
StartReplicationTaskType: dmstypes.StartReplicationTaskTypeValueReloadTarget,
},
); err != nil {
return err
}
t.L().Printf("waiting for replication task to be running")
if err := dms.NewReplicationTaskRunningWaiter(dmsCli).Wait(
ctx,
dmsDescribeTasksInput(t.BuildVersion()),
awsdmsWaitTimeLimit,
); err != nil {
return err
for _, task := range dmsTasks {
t.L().Printf(fmt.Sprintf("creating replication task for %s", task.tableName))
replTaskOut, err := dmsCli.CreateReplicationTask(
ctx,
&dms.CreateReplicationTaskInput{
MigrationType: dmstypes.MigrationTypeValueFullLoadAndCdc,
ReplicationInstanceArn: proto.String(replicationARN),
ReplicationTaskIdentifier: proto.String(awsdmsRoachtestDMSTaskName(t.BuildVersion(), task.tableName)),
SourceEndpointArn: proto.String(sourceARN),
TargetEndpointArn: proto.String(targetARN),
// TODO(#migrations): when AWS API supports EnableValidation, add it here.
TableMappings: proto.String(fmt.Sprintf(task.tableMappings, task.tableName)),
ReplicationTaskSettings: task.replicationSettings,
},
)
if err != nil {
return err
}
t.L().Printf("waiting for replication task to be ready")
if err := dms.NewReplicationTaskReadyWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion(), task.tableName), awsdmsWaitTimeLimit); err != nil {
return err
}
t.L().Printf("starting replication task")
if _, err := dmsCli.StartReplicationTask(
ctx,
&dms.StartReplicationTaskInput{
ReplicationTaskArn: replTaskOut.ReplicationTask.ReplicationTaskArn,
StartReplicationTaskType: dmstypes.StartReplicationTaskTypeValueReloadTarget,
},
); err != nil {
return err
}
t.L().Printf("waiting for replication task to be running")
if err := dms.NewReplicationTaskRunningWaiter(dmsCli).Wait(
ctx,
dmsDescribeTasksInput(t.BuildVersion(), task.tableName),
awsdmsWaitTimeLimit,
); err != nil {
return err
}
}
return nil
}
Expand Down Expand Up @@ -662,38 +731,40 @@ func tearDownAWSDMS(
// tearDownDMSTasks tears down the DMS task, endpoints and replication instance
// that may have been created.
func tearDownDMSTasks(ctx context.Context, t test.Test, dmsCli *dms.Client) error {
dmsTasks, err := dmsCli.DescribeReplicationTasks(ctx, dmsDescribeTasksInput(t.BuildVersion()))
if err != nil {
if !isDMSResourceNotFound(err) {
return err
}
} else {
wasRunning := false
for _, task := range dmsTasks.ReplicationTasks {
if *task.Status == "running" {
t.L().Printf("stopping DMS task %s (arn: %s)", *task.ReplicationTaskIdentifier, *task.ReplicationTaskArn)
if _, err := dmsCli.StopReplicationTask(ctx, &dms.StopReplicationTaskInput{ReplicationTaskArn: task.ReplicationTaskArn}); err != nil {
for _, task := range dmsTasks {
dmsTasks, err := dmsCli.DescribeReplicationTasks(ctx, dmsDescribeTasksInput(t.BuildVersion(), task.tableName))
if err != nil {
if !isDMSResourceNotFound(err) {
return err
}
} else {
wasRunning := false
for _, task := range dmsTasks.ReplicationTasks {
if *task.Status == "running" {
t.L().Printf("stopping DMS task %s (arn: %s)", *task.ReplicationTaskIdentifier, *task.ReplicationTaskArn)
if _, err := dmsCli.StopReplicationTask(ctx, &dms.StopReplicationTaskInput{ReplicationTaskArn: task.ReplicationTaskArn}); err != nil {
return err
}
wasRunning = true
}
}
if wasRunning {
t.L().Printf("waiting for task to be stopped")
if err := dms.NewReplicationTaskStoppedWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion(), task.tableName), awsdmsWaitTimeLimit); err != nil {
return err
}
wasRunning = true
}
}
if wasRunning {
t.L().Printf("waiting for task to be stopped")
if err := dms.NewReplicationTaskStoppedWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion()), awsdmsWaitTimeLimit); err != nil {
return err
for _, task := range dmsTasks.ReplicationTasks {
t.L().Printf("deleting DMS task %s (arn: %s)", *task.ReplicationTaskIdentifier, *task.ReplicationTaskArn)
if _, err := dmsCli.DeleteReplicationTask(ctx, &dms.DeleteReplicationTaskInput{ReplicationTaskArn: task.ReplicationTaskArn}); err != nil {
return err
}
}
}
for _, task := range dmsTasks.ReplicationTasks {
t.L().Printf("deleting DMS task %s (arn: %s)", *task.ReplicationTaskIdentifier, *task.ReplicationTaskArn)
if _, err := dmsCli.DeleteReplicationTask(ctx, &dms.DeleteReplicationTaskInput{ReplicationTaskArn: task.ReplicationTaskArn}); err != nil {
t.L().Printf("waiting for task to be deleted")
if err := dms.NewReplicationTaskDeletedWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion(), task.tableName), awsdmsWaitTimeLimit); err != nil {
return err
}
}
t.L().Printf("waiting for task to be deleted")
if err := dms.NewReplicationTaskDeletedWaiter(dmsCli).Wait(ctx, dmsDescribeTasksInput(t.BuildVersion()), awsdmsWaitTimeLimit); err != nil {
return err
}
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -2108,7 +2108,7 @@ func (r *Replica) MeasureRaftCPUNanos(start time.Duration) {
// is recorded against the replica's cpu time attribution.
func (r *Replica) measureNanosRunning(start time.Duration, f func(float64)) {
end := grunning.Time()
dur := grunning.Difference(start, end).Nanoseconds()
dur := grunning.Elapsed(start, end).Nanoseconds()
f(float64(dur))
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/catalog/colinfo/result_columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,16 +275,16 @@ var ExportColumns = ResultColumns{
{Name: "bytes", Typ: types.Int},
}

// TenantColumns appear in all SHOW TENANT queries.
var TenantColumns = ResultColumns{
{Name: "id", Typ: types.Int},
{Name: "name", Typ: types.String},
{Name: "status", Typ: types.String},
}

// TenantColumnsWithReplication is appended to TenantColumns for
// SHOW TENANT ... WITH REPLICATION STATUS queries.
var TenantColumnsWithReplication = ResultColumns{
{Name: "id", Typ: types.Int},
{Name: "name", Typ: types.String},
{Name: "status", Typ: types.String},
{Name: "source_tenant_name", Typ: types.String},
{Name: "source_cluster_uri", Typ: types.String},
{Name: "replication_job_id", Typ: types.Int},
Expand Down
Loading

0 comments on commit ba572fc

Please sign in to comment.