Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Branch2 delete target db pattern #599

Merged
merged 13 commits into from
Dec 11, 2024
392 changes: 220 additions & 172 deletions doc-chinese/toturial/08-Branch.md

Large diffs are not rendered by default.

Binary file added doc-chinese/toturial/images/BranchStatus.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
719 changes: 197 additions & 522 deletions doc/toturial/08-Branch.md

Large diffs are not rendered by default.

Binary file added doc/toturial/images/BranchStatus.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
166 changes: 151 additions & 15 deletions endtoend/branch/branch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"github.com/stretchr/testify/assert"
"github.com/wesql/wescale/endtoend/framework"
"strconv"
"testing"
"time"
)
Expand Down Expand Up @@ -78,10 +79,12 @@ func sourcePrepare() {
"DROP DATABASE IF EXISTS test_db1;",
"DROP DATABASE IF EXISTS test_db2;",
"DROP DATABASE IF EXISTS test_db3;",
"DROP DATABASE IF EXISTS test_db4;",

"CREATE DATABASE test_db1;",
"CREATE DATABASE test_db2;",
"CREATE DATABASE test_db3;",
"CREATE DATABASE test_db4;",

`CREATE TABLE test_db1.users (
id INT PRIMARY KEY AUTO_INCREMENT,
Expand All @@ -106,6 +109,11 @@ func sourcePrepare() {
category VARCHAR(50),
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);`,

`CREATE TABLE test_db4.student (
id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(200) NOT NULL
);`,
}
for _, statement := range sqlStatements {
_, err := sourceCluster.WescaleDb.Exec(statement)
Expand All @@ -120,6 +128,8 @@ func sourceClean() {
"DROP DATABASE IF EXISTS test_db1;",
"DROP DATABASE IF EXISTS test_db2;",
"DROP DATABASE IF EXISTS test_db3;",
"DROP DATABASE IF EXISTS test_db4;",
"DROP DATABASE IF EXISTS target_db",
}
for _, statement := range sqlStatements {
_, err := sourceCluster.WescaleDb.Exec(statement)
Expand All @@ -134,8 +144,11 @@ func targetPrepare() {
"DROP DATABASE IF EXISTS test_db1;",
"DROP DATABASE IF EXISTS test_db2;",
"DROP DATABASE IF EXISTS test_db3;",
"DROP DATABASE IF EXISTS test_db4;",
"DROP DATABASE IF EXISTS target_db;",

"CREATE DATABASE test_db3;",
"CREATE DATABASE target_db",

`CREATE TABLE test_db3.target_products (
product_id INT PRIMARY KEY AUTO_INCREMENT,
Expand All @@ -145,6 +158,11 @@ func targetPrepare() {
category VARCHAR(50),
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);`,

`CREATE TABLE target_db.target_new_table (
id INT PRIMARY KEY AUTO_INCREMENT,
col1 VARCHAR(200) NOT NULL
);`,
}
for _, statement := range sqlStatements {
_, err := targetCluster.WescaleDb.Exec(statement)
Expand All @@ -159,6 +177,8 @@ func targetClean() {
"DROP DATABASE IF EXISTS test_db1;",
"DROP DATABASE IF EXISTS test_db2;",
"DROP DATABASE IF EXISTS test_db3;",
"DROP DATABASE IF EXISTS test_db4;",
"DROP DATABASE IF EXISTS target_db",
}
for _, statement := range sqlStatements {
_, err := targetCluster.WescaleDb.Exec(statement)
Expand Down Expand Up @@ -193,8 +213,8 @@ func getBranchCreateCMD(
)
}

func getBranchCleanUpCMD() string {
return fmt.Sprintf(`Branch clean_up;`)
func getBranchDeleteCMD() string {
return fmt.Sprintf(`Branch delete;`)
}

// default override
Expand All @@ -213,21 +233,114 @@ func getBranchMergeBackCMD() string {
return fmt.Sprintf(`Branch merge_back;`)
}

func printBranchDiff(rows *sql.Rows) {
func getBranchShowCMD(showOption string) string {
return fmt.Sprintf(`Branch show with ('show_option'='%s');`, showOption)
}

type BranchDiffRow struct {
Name string
Database string
TableName string
DDL string
}

func printBranchDiff(rows *sql.Rows) []BranchDiffRow {
fmt.Printf("---------------------- start printing branch diff ----------------------\n")
rst := make([]BranchDiffRow, 0)
for rows.Next() {
var (
name string
database string
tableName string
ddl string
)
err := rows.Scan(&database, &tableName, &ddl)
err := rows.Scan(&name, &database, &tableName, &ddl)
if err != nil {
panic(err)
}
fmt.Printf("Database: %s, Table: %s, DDL: %s\n", database, tableName, ddl)
rst = append(rst, BranchDiffRow{
Name: name,
Database: database,
TableName: tableName,
DDL: ddl,
})
fmt.Printf("Branch Name: %s, Database: %s, Table: %s, DDL: %s\n", name, database, tableName, ddl)
}
fmt.Printf("---------------------- print branch diff end ----------------------\n")
return rst
}

func branchDiffContains(rows []BranchDiffRow, name, database string, tableName string, ddl string) bool {
for _, row := range rows {
if row.Name == name && row.Database == database && row.TableName == tableName && row.DDL == ddl {
return true
}
}
return false
}

func printBranchShowStatus(rows *sql.Rows) {
fmt.Printf("---------------------- start printing branch show status ----------------------\n")
for rows.Next() {
var (
name string
status string
sourceHost string
sourcePort int
sourceUser string
includeDatabases string
excludeDatabases string
)
err := rows.Scan(&name, &status, &sourceHost, &sourcePort, &sourceUser, &includeDatabases, &excludeDatabases)
if err != nil {
panic(err)
}
fmt.Printf("Branch Name: %s, Status: %s, Source Host: %s, Source Port: %d, Source User: %s, Include Databases: %s, Exclude Databases: %s\n",
name, status, sourceHost, sourcePort, sourceUser, includeDatabases, excludeDatabases)
}
fmt.Printf("---------------------- print branch show status end ----------------------\n")
}

func printBranchShowSnapshot(rows *sql.Rows) {
fmt.Printf("---------------------- start printing branch show snapshot ----------------------\n")
for rows.Next() {
var (
id int
name string
database string
table string
createTableSQL string
updateTimestamp string
)

if err := rows.Scan(&id, &name, &database, &table, &createTableSQL, &updateTimestamp); err != nil {
panic(err)
}
fmt.Printf("Snapshot ID: %d, Branch Name: %s, Database: %s, Table: %s, CreateTableSQL: %s, UpdateTimestamp: %s\n",
id, name, database, table, createTableSQL, updateTimestamp)
}
fmt.Printf("---------------------- print branch show snapshot end ----------------------\n")
}

func printBranchShowMergeBackDDL(rows *sql.Rows) {
fmt.Printf("---------------------- start printing branch show merge back ddl ----------------------\n")
for rows.Next() {
var (
id int
name string
database string
table string
ddl string
merged bool
)

if err := rows.Scan(&id, &name, &database, &table, &ddl, &merged); err != nil {
panic(err)
}
fmt.Printf("Merge Back DDL ID: %d, Branch Name: %s, Database: %s, Table: %s, DDL: %s, Merged or not: %s\n",
id, name, database, table, ddl, strconv.FormatBool(merged))
}
fmt.Printf("---------------------- print branch show merge back ddl end ----------------------\n")
}

func TestBranchBasic(t *testing.T) {
Expand All @@ -236,7 +349,7 @@ func TestBranchBasic(t *testing.T) {
targetPrepare()

// defer cleanup
defer framework.ExecNoError(t, targetCluster.WescaleDb, getBranchCleanUpCMD())
defer framework.ExecNoError(t, targetCluster.WescaleDb, getBranchDeleteCMD())
defer sourceClean()
defer targetClean()

Expand All @@ -262,17 +375,38 @@ func TestBranchBasic(t *testing.T) {
framework.ExecNoError(t, targetCluster.WescaleDb, "ALTER TABLE test_db2.orders ADD COLUMN description TEXT;")
assert.Equal(t, true, framework.CheckColumnExists(t, targetCluster.WescaleDb, "test_db2", "orders", "description"))

framework.ExecNoError(t, targetCluster.WescaleDb, "DROP DATABASE IF EXISTS test_db4")
assert.Equal(t, false, framework.CheckDatabaseExists(t, targetCluster.WescaleDb, "test_db4"))

// branch diff
diffCMD := getBranchDiffCMD("source_target")
rows := framework.QueryNoError(t, targetCluster.WescaleDb, diffCMD)
defer rows.Close()
printBranchDiff(rows)
branchDiff := printBranchDiff(rows)
assert.Equal(t, true, branchDiffContains(branchDiff, "origin", "target_db", "", "CREATE DATABASE IF NOT EXISTS `target_db`"))
assert.Equal(t, true, branchDiffContains(branchDiff, "origin", "test_db4", "", "DROP DATABASE IF EXISTS `test_db4`"))

// branch prepare merge back
rows2 := framework.QueryNoError(t, targetCluster.WescaleDb, getBranchPrepareMergeBackCMD())
defer rows2.Close()
printBranchDiff(rows2)

// branch show
showStatus := "branch show;"
rowsStatus := framework.QueryNoError(t, targetCluster.WescaleDb, showStatus)
defer rowsStatus.Close()
printBranchShowStatus(rowsStatus)

showSnapshot := getBranchShowCMD("snapshot")
rowsSnapshot := framework.QueryNoError(t, targetCluster.WescaleDb, showSnapshot)
defer rowsSnapshot.Close()
printBranchShowSnapshot(rowsSnapshot)

showMergeBackDDL := getBranchShowCMD("merge_back_ddl")
rowsMergeBackDDL := framework.QueryNoError(t, targetCluster.WescaleDb, showMergeBackDDL)
defer rowsMergeBackDDL.Close()
printBranchShowMergeBackDDL(rowsMergeBackDDL)

// branch merge
framework.ExecNoError(t, targetCluster.WescaleDb, getBranchMergeBackCMD())

Expand All @@ -290,6 +424,8 @@ func TestBranchBasic(t *testing.T) {
assert.Equal(t, true, framework.CheckColumnExists(t, sourceCluster.WescaleDb, "test_db3", "target_products", "description"))
assert.Equal(t, true, framework.CheckColumnExists(t, sourceCluster.WescaleDb, "test_db2", "orders", "description"))

assert.Equal(t, false, framework.CheckDatabaseExists(t, sourceCluster.WescaleDb, "test_db4"))
assert.Equal(t, true, framework.CheckTableExists(t, sourceCluster.WescaleDb, "target_db", "target_new_table"))
}

func TestBranchBasicWithFailPoint(t *testing.T) {
Expand All @@ -298,24 +434,24 @@ func TestBranchBasicWithFailPoint(t *testing.T) {
targetPrepare()

// defer cleanup
defer framework.ExecNoError(t, targetCluster.WescaleDb, getBranchCleanUpCMD())
defer framework.ExecNoError(t, targetCluster.WescaleDb, getBranchDeleteCMD())
defer sourceClean()
defer targetClean()

// create branch
createCMD := getBranchCreateCMD(sourceHostToTarget, sourceCluster.MysqlPort, "root", "passwd", "*", "information_schema,mysql,performance_schema,sys")
framework.EnableFailPoint(t, targetCluster.WescaleDb, "vitess.io/vitess/go/vt/vtgate/branch/BranchFetchSnapshotError", "return(true)")
framework.ExecWithErrorContains(t, targetCluster.WescaleDb, "failpoint", createCMD)
expectBranchStatus(t, "my_branch", "init")
expectBranchStatus(t, "origin", "init")

framework.DisableFailPoint(t, targetCluster.WescaleDb, "vitess.io/vitess/go/vt/vtgate/branch/BranchFetchSnapshotError")
framework.EnableFailPoint(t, targetCluster.WescaleDb, "vitess.io/vitess/go/vt/vtgate/branch/BranchApplySnapshotError", "return(true)")
framework.ExecWithErrorContains(t, targetCluster.WescaleDb, "failpoint", createCMD)
expectBranchStatus(t, "my_branch", "fetched")
expectBranchStatus(t, "origin", "fetched")
framework.DisableFailPoint(t, targetCluster.WescaleDb, "vitess.io/vitess/go/vt/vtgate/branch/BranchApplySnapshotError")

framework.ExecNoError(t, targetCluster.WescaleDb, createCMD)
expectBranchStatus(t, "my_branch", "created")
expectBranchStatus(t, "origin", "created")

assert.Equal(t, true, framework.CheckTableExists(t, targetCluster.WescaleDb, "test_db1", "users"))
assert.Equal(t, true, framework.CheckTableExists(t, targetCluster.WescaleDb, "test_db2", "orders"))
Expand Down Expand Up @@ -345,22 +481,22 @@ func TestBranchBasicWithFailPoint(t *testing.T) {
// branch prepare merge back
framework.EnableFailPoint(t, targetCluster.WescaleDb, "vitess.io/vitess/go/vt/vtgate/branch/BranchInsertMergeBackDDLError", "return(true)")
framework.ExecWithErrorContains(t, targetCluster.WescaleDb, "failpoint", getBranchPrepareMergeBackCMD())
expectBranchStatus(t, "my_branch", "preparing")
expectBranchStatus(t, "origin", "preparing")
framework.DisableFailPoint(t, targetCluster.WescaleDb, "vitess.io/vitess/go/vt/vtgate/branch/BranchInsertMergeBackDDLError")

rows2 := framework.QueryNoError(t, targetCluster.WescaleDb, getBranchPrepareMergeBackCMD())
defer rows2.Close()
printBranchDiff(rows2)
expectBranchStatus(t, "my_branch", "prepared")
expectBranchStatus(t, "origin", "prepared")

// branch merge
framework.EnableFailPoint(t, targetCluster.WescaleDb, "vitess.io/vitess/go/vt/vtgate/branch/BranchExecuteMergeBackDDLError", "return(true)")
framework.ExecWithErrorContains(t, targetCluster.WescaleDb, "failpoint", getBranchMergeBackCMD())
expectBranchStatus(t, "my_branch", "merging")
expectBranchStatus(t, "origin", "merging")

framework.DisableFailPoint(t, targetCluster.WescaleDb, "vitess.io/vitess/go/vt/vtgate/branch/BranchExecuteMergeBackDDLError")
framework.ExecNoError(t, targetCluster.WescaleDb, getBranchMergeBackCMD())
expectBranchStatus(t, "my_branch", "merged")
expectBranchStatus(t, "origin", "merged")

// no diff
rows3 := framework.QueryNoError(t, targetCluster.WescaleDb, getBranchDiffCMD("source_target"))
Expand Down
18 changes: 18 additions & 0 deletions endtoend/framework/sql_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func CheckTableExists(t *testing.T, db *sql.DB, schema string, table string) boo
return false
}

// CheckColumnExists checks if a specific column exists in a given table.
func CheckColumnExists(t *testing.T, db *sql.DB, schema, table, column string) bool {
t.Helper()
query := fmt.Sprintf(`SELECT COUNT(1) FROM information_schema.columns WHERE table_schema = '%s' AND table_name = '%s' AND column_name = '%s'`, schema, table, column)
Expand All @@ -74,6 +75,23 @@ func CheckColumnExists(t *testing.T, db *sql.DB, schema, table, column string) b
return count > 0
}

// CheckDatabaseExists checks if a specific database exists in the MySQL server.
func CheckDatabaseExists(t *testing.T, db *sql.DB, databaseName string) bool {
t.Helper()
query := fmt.Sprintf(`SELECT COUNT(1) FROM information_schema.schemata WHERE schema_name = '%s'`, databaseName)

rows := QueryNoError(t, db, query)
defer rows.Close()

var count int
if rows.Next() {
err := rows.Scan(&count)
assert.NoError(t, err)
}

return count > 0
}

func EnableFailPoint(t *testing.T, db *sql.DB, key, value string) {
t.Helper()
query := fmt.Sprintf("set @put_failpoint='%s=%s'", key, value)
Expand Down
4 changes: 3 additions & 1 deletion go/internal/global/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ Licensed under the Apache v2(found in the LICENSE file in the root directory).

package global

import "time"
import (
"time"
)

// Keyspace
const (
Expand Down
31 changes: 31 additions & 0 deletions go/viperutil/vtgate_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,35 @@ func RegisterReloadHandlersForVtGate(v *ViperConfig) {
log.Errorf("fail to reload config %s=%s, err: %v", key, value, err)
}
})

// branch
v.ReloadHandler.AddReloadHandler("branch_default_name", func(key string, value string, fs *pflag.FlagSet) {
if err := fs.Set("branch_default_name", value); err != nil {
log.Errorf("fail to reload config %s=%s, err: %v", key, value, err)
}
})

v.ReloadHandler.AddReloadHandler("branch_default_target_host", func(key string, value string, fs *pflag.FlagSet) {
if err := fs.Set("branch_default_target_host", value); err != nil {
log.Errorf("fail to reload config %s=%s, err: %v", key, value, err)
}
})

v.ReloadHandler.AddReloadHandler("branch_default_target_port", func(key string, value string, fs *pflag.FlagSet) {
if err := fs.Set("branch_default_target_port", value); err != nil {
log.Errorf("fail to reload config %s=%s, err: %v", key, value, err)
}
})

v.ReloadHandler.AddReloadHandler("branch_default_target_user", func(key string, value string, fs *pflag.FlagSet) {
if err := fs.Set("branch_default_target_user", value); err != nil {
log.Errorf("fail to reload config %s=%s, err: %v", key, value, err)
}
})

v.ReloadHandler.AddReloadHandler("branch_default_target_password", func(key string, value string, fs *pflag.FlagSet) {
if err := fs.Set("branch_default_target_password", value); err != nil {
log.Errorf("fail to reload config %s=%s, err: %v", key, value, err)
}
})
}
Loading
Loading