Skip to content

Commit

Permalink
precheck(dm): fix MySQL 8.0 SHOW GRANTS with role (#6485)
Browse files Browse the repository at this point in the history
close #6448, close #6506
  • Loading branch information
lance6716 authored Aug 2, 2022
1 parent 750b56c commit f34c680
Show file tree
Hide file tree
Showing 12 changed files with 164 additions and 87 deletions.
40 changes: 40 additions & 0 deletions cdc/entry/schema/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import (
"github.com/google/btree"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tiflow/cdc/model"
"go.uber.org/zap"

Expand Down Expand Up @@ -117,6 +122,7 @@ func NewSingleSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, forceReplica
// meta is nil only in unit tests
if meta == nil {
snap := NewEmptySnapshot(forceReplicate)
snap.InitConcurrentDDLTables()
snap.inner.currentTs = currentTs
return snap, nil
}
Expand Down Expand Up @@ -194,9 +200,43 @@ func NewEmptySnapshot(forceReplicate bool) *Snapshot {
forceReplicate: forceReplicate,
currentTs: 0,
}

return &Snapshot{inner: inner, rwlock: new(sync.RWMutex)}
}

// these constants imitate TiDB's session.InitDDLJobTables in an empty Snapshot.
const (
mysqlDBID = int64(1)
dummyTS = uint64(1)
)

// InitConcurrentDDLTables imitates the creating table logic for concurrent DDL.
// Since v6.2.0, tables of concurrent DDL will be directly written as meta KV in
// TiKV, without being written to history DDL jobs. So the Snapshot which is not
// build from meta needs this method to handle history DDL.
func (s *Snapshot) InitConcurrentDDLTables() {
tableIDs := [...]int64{ddl.JobTableID, ddl.ReorgTableID, ddl.HistoryTableID}

mysqlDBInfo := &timodel.DBInfo{
ID: mysqlDBID,
Name: timodel.NewCIStr(mysql.SystemDB),
Charset: mysql.UTF8MB4Charset,
Collate: mysql.UTF8MB4DefaultCollation,
State: timodel.StatePublic,
}
_ = s.inner.createSchema(mysqlDBInfo, dummyTS)

p := parser.New()
for i, table := range session.DDLJobTables {
stmt, _ := p.ParseOneStmt(table.SQL, "", "")
tblInfo, _ := ddl.BuildTableInfoFromAST(stmt.(*ast.CreateTableStmt))
tblInfo.State = timodel.StatePublic
tblInfo.ID = tableIDs[i]
wrapped := model.WrapTableInfo(mysqlDBID, mysql.SystemDB, dummyTS, tblInfo)
_ = s.inner.createTable(wrapped, dummyTS)
}
}

// Copy creates a new schema snapshot based on the given one. The copied one shares same internal
// data structures with the old one to save memory usage.
func (s *Snapshot) Copy() *Snapshot {
Expand Down
2 changes: 2 additions & 0 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func NewSchemaStorage(
)
if meta == nil {
snap = schema.NewEmptySnapshot(forceReplicate)
snap.InitConcurrentDDLTables()
} else {
snap, err = schema.NewSnapshotFromMeta(meta, startTs, forceReplicate)
if err != nil {
Expand Down Expand Up @@ -189,6 +190,7 @@ func (s *schemaStorageImpl) HandleDDLJob(job *timodel.Job) error {
snap = lastSnap.Copy()
} else {
snap = schema.NewEmptySnapshot(s.forceReplicate)
snap.InitConcurrentDDLTables()
}
if err := snap.HandleDDL(job); err != nil {
log.Error("handle DDL failed", zap.String("DDL", job.Query),
Expand Down
85 changes: 43 additions & 42 deletions cdc/entry/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,16 +485,16 @@ func TestMultiVersionStorage(t *testing.T) {
tbName := timodel.NewCIStr("T1")
// db and ignoreDB info
dbInfo := &timodel.DBInfo{
ID: 1,
ID: 11,
Name: dbName,
State: timodel.StatePublic,
}
var jobs []*timodel.Job
// `createSchema` job1
job := &timodel.Job{
ID: 3,
ID: 13,
State: timodel.JobStateSynced,
SchemaID: 1,
SchemaID: 11,
Type: timodel.ActionCreateSchema,
BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 1, DBInfo: dbInfo, FinishedTS: 100},
Query: "create database test",
Expand All @@ -503,17 +503,17 @@ func TestMultiVersionStorage(t *testing.T) {

// table info
tblInfo := &timodel.TableInfo{
ID: 2,
ID: 12,
Name: tbName,
State: timodel.StatePublic,
}

// `createTable` job
job = &timodel.Job{
ID: 6,
ID: 16,
State: timodel.JobStateSynced,
SchemaID: 1,
TableID: 2,
SchemaID: 11,
TableID: 12,
Type: timodel.ActionCreateTable,
BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 2, TableInfo: tblInfo, FinishedTS: 110},
Query: "create table " + tbName.O,
Expand All @@ -524,16 +524,16 @@ func TestMultiVersionStorage(t *testing.T) {
tbName = timodel.NewCIStr("T2")
// table info
tblInfo = &timodel.TableInfo{
ID: 3,
ID: 13,
Name: tbName,
State: timodel.StatePublic,
}
// `createTable` job
job = &timodel.Job{
ID: 6,
ID: 16,
State: timodel.JobStateSynced,
SchemaID: 1,
TableID: 3,
SchemaID: 11,
TableID: 13,
Type: timodel.ActionCreateTable,
BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 3, TableInfo: tblInfo, FinishedTS: 120},
Query: "create table " + tbName.O,
Expand All @@ -549,10 +549,10 @@ func TestMultiVersionStorage(t *testing.T) {

// `dropTable` job
job = &timodel.Job{
ID: 6,
ID: 16,
State: timodel.JobStateSynced,
SchemaID: 1,
TableID: 2,
SchemaID: 11,
TableID: 12,
Type: timodel.ActionDropTable,
BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 4, FinishedTS: 130},
}
Expand All @@ -562,9 +562,9 @@ func TestMultiVersionStorage(t *testing.T) {

// `dropSchema` job
job = &timodel.Job{
ID: 6,
ID: 16,
State: timodel.JobStateSynced,
SchemaID: 1,
SchemaID: 11,
Type: timodel.ActionDropSchema,
BinlogInfo: &timodel.HistoryInfo{SchemaVersion: 5, FinishedTS: 140, DBInfo: dbInfo},
}
Expand All @@ -575,70 +575,71 @@ func TestMultiVersionStorage(t *testing.T) {
require.Equal(t, storage.(*schemaStorageImpl).resolvedTs, uint64(140))
snap, err := storage.GetSnapshot(ctx, 100)
require.Nil(t, err)
_, exist := snap.SchemaByID(1)
_, exist := snap.SchemaByID(11)
require.True(t, exist)
_, exist = snap.PhysicalTableByID(2)
_, exist = snap.PhysicalTableByID(12)
require.False(t, exist)
_, exist = snap.PhysicalTableByID(3)
_, exist = snap.PhysicalTableByID(13)
require.False(t, exist)

snap, err = storage.GetSnapshot(ctx, 115)
require.Nil(t, err)
_, exist = snap.SchemaByID(1)
_, exist = snap.SchemaByID(11)
require.True(t, exist)
_, exist = snap.PhysicalTableByID(2)
_, exist = snap.PhysicalTableByID(12)
require.True(t, exist)
_, exist = snap.PhysicalTableByID(3)
_, exist = snap.PhysicalTableByID(13)
require.False(t, exist)

snap, err = storage.GetSnapshot(ctx, 125)
require.Nil(t, err)
_, exist = snap.SchemaByID(1)
_, exist = snap.SchemaByID(11)
require.True(t, exist)
_, exist = snap.PhysicalTableByID(2)
_, exist = snap.PhysicalTableByID(12)
require.True(t, exist)
_, exist = snap.PhysicalTableByID(3)
_, exist = snap.PhysicalTableByID(13)
require.True(t, exist)

snap, err = storage.GetSnapshot(ctx, 135)
require.Nil(t, err)
_, exist = snap.SchemaByID(1)
_, exist = snap.SchemaByID(11)
require.True(t, exist)
_, exist = snap.PhysicalTableByID(2)
_, exist = snap.PhysicalTableByID(12)
require.False(t, exist)
_, exist = snap.PhysicalTableByID(3)
_, exist = snap.PhysicalTableByID(13)
require.True(t, exist)

snap, err = storage.GetSnapshot(ctx, 140)
require.Nil(t, err)
_, exist = snap.SchemaByID(1)
_, exist = snap.SchemaByID(11)
require.False(t, exist)
_, exist = snap.PhysicalTableByID(2)
_, exist = snap.PhysicalTableByID(12)
require.False(t, exist)
_, exist = snap.PhysicalTableByID(3)
_, exist = snap.PhysicalTableByID(13)
require.False(t, exist)

lastSchemaTs := storage.DoGC(0)
require.Equal(t, uint64(0), lastSchemaTs)
// Snapshot.InitConcurrentDDLTables will create a schema with ts = 1
require.Equal(t, uint64(1), lastSchemaTs)

snap, err = storage.GetSnapshot(ctx, 100)
require.Nil(t, err)
_, exist = snap.SchemaByID(1)
_, exist = snap.SchemaByID(11)
require.True(t, exist)
_, exist = snap.PhysicalTableByID(2)
_, exist = snap.PhysicalTableByID(12)
require.False(t, exist)
_, exist = snap.PhysicalTableByID(3)
_, exist = snap.PhysicalTableByID(13)
require.False(t, exist)
storage.DoGC(115)
_, err = storage.GetSnapshot(ctx, 100)
require.NotNil(t, err)
snap, err = storage.GetSnapshot(ctx, 115)
require.Nil(t, err)
_, exist = snap.SchemaByID(1)
_, exist = snap.SchemaByID(11)
require.True(t, exist)
_, exist = snap.PhysicalTableByID(2)
_, exist = snap.PhysicalTableByID(12)
require.True(t, exist)
_, exist = snap.PhysicalTableByID(3)
_, exist = snap.PhysicalTableByID(13)
require.False(t, exist)

lastSchemaTs = storage.DoGC(155)
Expand All @@ -648,11 +649,11 @@ func TestMultiVersionStorage(t *testing.T) {

snap, err = storage.GetSnapshot(ctx, 180)
require.Nil(t, err)
_, exist = snap.SchemaByID(1)
_, exist = snap.SchemaByID(11)
require.False(t, exist)
_, exist = snap.PhysicalTableByID(2)
_, exist = snap.PhysicalTableByID(12)
require.False(t, exist)
_, exist = snap.PhysicalTableByID(3)
_, exist = snap.PhysicalTableByID(13)
require.False(t, exist)
_, err = storage.GetSnapshot(ctx, 130)
require.NotNil(t, err)
Expand Down Expand Up @@ -734,7 +735,7 @@ func TestExplicitTables(t *testing.T) {
require.GreaterOrEqual(t, snap2.TableCount(false), 4)

require.Equal(t, snap3.TableCount(true)-snap1.TableCount(true), 5)
require.Equal(t, snap3.TableCount(false), 37)
require.Equal(t, snap3.TableCount(false), 40)
}

/*
Expand Down
12 changes: 6 additions & 6 deletions dm/dumpling/dumpling.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,13 +268,13 @@ func (m *Dumpling) Status(_ *binlog.SourceStatus) interface{} {
}

func (m *Dumpling) status() *pb.DumpStatus {
mid := m.core.GetParameters()
dumpStatus := m.core.GetStatus()
s := &pb.DumpStatus{
TotalTables: mid.TotalTables,
CompletedTables: mid.CompletedTables,
FinishedBytes: mid.FinishedBytes,
FinishedRows: mid.FinishedRows,
EstimateTotalRows: mid.EstimateTotalRows,
TotalTables: dumpStatus.TotalTables,
CompletedTables: dumpStatus.CompletedTables,
FinishedBytes: dumpStatus.FinishedBytes,
FinishedRows: dumpStatus.FinishedRows,
EstimateTotalRows: dumpStatus.EstimateTotalRows,
}
var estimateProgress string
if s.FinishedRows >= s.EstimateTotalRows {
Expand Down
2 changes: 1 addition & 1 deletion dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3564,7 +3564,7 @@ func (s *Syncer) flushOptimisticTableInfos(tctx *tcontext.Context) {
targetTable := tbl[1]
tableInfo, err := s.getTableInfo(tctx, &sourceTable, &targetTable)
if err != nil {
tctx.L().Error("failed to get table infos", log.ShortError(err))
tctx.L().Error("failed to get table infos", log.ShortError(err))
continue
}
sourceTables = append(sourceTables, &sourceTable)
Expand Down
3 changes: 3 additions & 0 deletions dm/tests/full_mode/data/db2.prepare.user.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
drop user if exists 'dm_full';
drop role if exists 'test-8.0';
flush privileges;
create user 'dm_full'@'%' identified by '123456';
grant all privileges on *.* to 'dm_full'@'%';
revoke replication slave, replication client, super on *.* from 'dm_full'@'%';
revoke create temporary tables, lock tables, create routine, alter routine, event, create tablespace, file, shutdown, execute, process, index on *.* from 'dm_full'@'%'; # privileges not supported by TiDB
create role 'test-8.0';
grant 'test-8.0' to 'dm_full';
flush privileges;
9 changes: 5 additions & 4 deletions dm/tests/full_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ function fail_acquire_global_lock() {
cp $cur/data/db2.prepare.user.sql $WORK_DIR/db2.prepare.user.sql
sed -i "/revoke create temporary/i\revoke reload on *.* from 'dm_full'@'%';" $WORK_DIR/db2.prepare.user.sql
run_sql_file $WORK_DIR/db2.prepare.user.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_count 'Query OK, 0 rows affected' 8
check_count 'Query OK, 0 rows affected' 11

run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
Expand Down Expand Up @@ -53,7 +53,8 @@ function fail_acquire_global_lock() {
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"\"stage\": \"Paused\"" 2 \
"you need (at least one of) the RELOAD privilege(s) for this operation" 2
"LOCK TABLES \`full_mode\`.\`t1\` READ: Error 1044: Access denied" 1 \
"LOCK TABLES \`full_mode\`.\`t2\` READ: Error 1044: Access denied" 1

cleanup_process $*
cleanup_data full_mode
Expand All @@ -80,7 +81,7 @@ function escape_schema() {
run_sql_file $cur/data/db1.prepare.user.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_count 'Query OK, 0 rows affected' 7
run_sql_file $cur/data/db2.prepare.user.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_count 'Query OK, 0 rows affected' 7
check_count 'Query OK, 0 rows affected' 10

export GO_FAILPOINTS='github.com/pingcap/tiflow/dm/dumpling/SleepBeforeDumplingClose=return(3)'

Expand Down Expand Up @@ -178,7 +179,7 @@ function run() {
run_sql_file $cur/data/db1.prepare.user.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_count 'Query OK, 0 rows affected' 7
run_sql_file $cur/data/db2.prepare.user.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_count 'Query OK, 0 rows affected' 7
check_count 'Query OK, 0 rows affected' 10

run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
Expand Down
5 changes: 4 additions & 1 deletion dm/tests/incremental_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ function run() {
# check reset binlog puller success
grep -Fq "reset replication binlog puller" $WORK_DIR/worker1/log/dm-worker.log
grep -Fq "reset replication binlog puller" $WORK_DIR/worker2/log/dm-worker.log

check_log_contain_with_retry 'finish to handle ddls in normal mode.*create table t2' $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log

# we use failpoint to let worker sleep 8 second when executeSQLs, to increase possibility of
# meeting an error of context cancel.
# when below check pass, it means we filter out that error, or that error doesn't happen.
Expand Down Expand Up @@ -200,7 +203,7 @@ function run() {
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-task $WORK_DIR/dm-task.yaml"

# the task should paused by `FlushCheckpointStage` failpont before flush old checkpoint.
# the task should paused by `FlushCheckpointStage` failpoint before flush old checkpoint.
# `db2.increment.sql` has no DDL, so we check count of content as `1`.
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
Expand Down
2 changes: 1 addition & 1 deletion dm/tests/many_tables/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ function run() {
check_metric $WORKER1_PORT 'lightning_tables{result="success",source_id="mysql-replica-01",state="completed",task="test"}' 1 499 501

# check https://github.com/pingcap/tiflow/issues/5063
check_time=20
check_time=100
sleep 5
while [ $check_time -gt 0 ]; do
syncer_recv_event_num=$(grep '"receive binlog event"' $WORK_DIR/worker1/log/dm-worker.log | wc -l)
Expand Down
Loading

0 comments on commit f34c680

Please sign in to comment.