Skip to content

Commit

Permalink
Merge pull request #8067 from planetscale/schema-fetch
Browse files Browse the repository at this point in the history
Update the schema copy with minimal changes
  • Loading branch information
systay authored May 11, 2021
2 parents 7a7a7c7 + 2116b85 commit 5599bc8
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 17 deletions.
36 changes: 31 additions & 5 deletions go/mysql/endtoend/schema_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ package endtoend

import (
"context"
"fmt"
"strings"
"testing"

"vitess.io/vitess/go/vt/sqlparser"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
Expand Down Expand Up @@ -79,6 +83,9 @@ func TestChangeSchemaIsNoticed(t *testing.T) {
}, {
name: "change PK",
changeQ: "alter table vttest.product drop primary key, add primary key (name)",
}, {
name: "two tables changes",
changeQ: "create table vttest.new_table2 (id bigint(20) primary key);alter table vttest.product drop column name",
}}

for _, test := range tests {
Expand All @@ -99,16 +106,35 @@ func TestChangeSchemaIsNoticed(t *testing.T) {
require.NoError(t, err)
require.Empty(t, rs.Rows)

// make the schema change
_, err = conn.ExecuteFetch(test.changeQ, 1000, true)
require.NoError(t, err)
for _, q := range strings.Split(test.changeQ, ";") {
// make the schema change
_, err = conn.ExecuteFetch(q, 1000, true)
require.NoError(t, err)
}

// make sure the change is detected
rs, err = conn.ExecuteFetch(mysql.DetectSchemaChange, 1000, true)
require.NoError(t, err)
require.NotEmpty(t, rs.Rows)
})

}
var tables []string
for _, row := range rs.Rows {
apa := sqlparser.NewStrLiteral(row[0].ToString())
tables = append(tables, "table_name = "+sqlparser.String(apa))
}
tableNamePredicates := strings.Join(tables, " OR ")
del := fmt.Sprintf("%s WHERE %s", mysql.ClearSchemaCopy, tableNamePredicates)
upd := fmt.Sprintf("%s AND %s", mysql.InsertIntoSchemaCopy, tableNamePredicates)

_, err = conn.ExecuteFetch(del, 1000, true)
require.NoError(t, err)
_, err = conn.ExecuteFetch(upd, 1000, true)
require.NoError(t, err)

// make sure the change is detected
rs, err = conn.ExecuteFetch(mysql.DetectSchemaChange, 1000, true)
require.NoError(t, err)
require.Empty(t, rs.Rows)
})
}
}
39 changes: 29 additions & 10 deletions go/vt/vttablet/tabletserver/health_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ import (
"flag"
"fmt"
"io"
"strings"
"sync"
"time"

"vitess.io/vitess/go/sqltypes"

"vitess.io/vitess/go/vt/sqlparser"

"vitess.io/vitess/go/vt/dbconfigs"

"vitess.io/vitess/go/mysql"
Expand Down Expand Up @@ -323,30 +328,49 @@ func (hs *healthStreamer) reload() error {
}
}

// TODO: fix the maxrows from using a magic number.
qr, err := conn.Exec(ctx, mysql.DetectSchemaChange, 10000, false)
var tables []string
var tableNames []string

callback := func(qr *sqltypes.Result) error {
for _, row := range qr.Rows {
table := row[0].ToString()
tables = append(tables, table)

escapedTblName := sqlparser.String(sqlparser.NewStrLiteral(table))
tableNames = append(tableNames, escapedTblName)
}

return nil
}
alloc := func() *sqltypes.Result { return &sqltypes.Result{} }
bufferSize := 1000
err = conn.Stream(ctx, mysql.DetectSchemaChange, callback, alloc, bufferSize, 0)
if err != nil {
return err
}

// If no change detected, then return
if len(qr.Rows) == 0 {
if len(tables) == 0 {
return nil
}

tableNamePredicate := fmt.Sprintf("table_name IN (%s)", strings.Join(tableNames, ", "))
del := fmt.Sprintf("%s WHERE %s", mysql.ClearSchemaCopy, tableNamePredicate)
upd := fmt.Sprintf("%s AND %s", mysql.InsertIntoSchemaCopy, tableNamePredicate)

// Reload the schema in a transaction.
_, err = conn.Exec(ctx, "begin", 1, false)
if err != nil {
return err
}
defer conn.Exec(ctx, "rollback", 1, false)

_, err = conn.Exec(ctx, mysql.ClearSchemaCopy, 1, false)
_, err = conn.Exec(ctx, del, 1, false)
if err != nil {
return err
}

_, err = conn.Exec(ctx, mysql.InsertIntoSchemaCopy, 1, false)
_, err = conn.Exec(ctx, upd, 1, false)
if err != nil {
return err
}
Expand All @@ -356,11 +380,6 @@ func (hs *healthStreamer) reload() error {
return err
}

// publish only if changes are committed.
var tables []string
for _, row := range qr.Rows {
tables = append(tables, row[0].ToString())
}
hs.state.RealtimeStats.TableSchemaChanged = tables
shr := proto.Clone(hs.state).(*querypb.StreamHealthResponse)
hs.broadCastToClients(shr)
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/health_streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ func TestReloadSchema(t *testing.T) {

db.AddQuery(mysql.CreateVTDatabase, &sqltypes.Result{})
db.AddQuery(mysql.CreateSchemaCopyTable, &sqltypes.Result{})
db.AddQuery(mysql.ClearSchemaCopy, &sqltypes.Result{})
db.AddQuery(mysql.InsertIntoSchemaCopy, &sqltypes.Result{})
db.AddQueryPattern(mysql.ClearSchemaCopy+".*", &sqltypes.Result{})
db.AddQueryPattern(mysql.InsertIntoSchemaCopy+".*", &sqltypes.Result{})
db.AddQuery("begin", &sqltypes.Result{})
db.AddQuery("commit", &sqltypes.Result{})
db.AddQuery("rollback", &sqltypes.Result{})
Expand Down

0 comments on commit 5599bc8

Please sign in to comment.