Skip to content

Commit

Permalink
VReplication: use db_filtered user for vstreams (#10080)
Browse files Browse the repository at this point in the history
This way vreplication is using the same user on the source (streamer) and
target (player).

And set the filtered user config in the binlog server streamer and
the tabletmanager restore.

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Apr 20, 2022
1 parent 2e272fe commit 93f4490
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 25 deletions.
5 changes: 2 additions & 3 deletions config/init_db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ GRANT GRANT OPTION ON *.* TO 'vt_dba'@'localhost';
CREATE USER 'vt_app'@'localhost';
GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, RELOAD, PROCESS, FILE,
REFERENCES, INDEX, ALTER, SHOW DATABASES, CREATE TEMPORARY TABLES,
LOCK TABLES, EXECUTE, REPLICATION SLAVE, REPLICATION CLIENT, CREATE VIEW,
LOCK TABLES, EXECUTE, REPLICATION CLIENT, CREATE VIEW,
SHOW VIEW, CREATE ROUTINE, ALTER ROUTINE, CREATE USER, EVENT, TRIGGER
ON *.* TO 'vt_app'@'localhost';

Expand All @@ -75,8 +75,7 @@ GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, RELOAD, PROCESS, FILE,
CREATE USER 'vt_repl'@'%';
GRANT REPLICATION SLAVE ON *.* TO 'vt_repl'@'%';

# User for Vitess filtered replication (binlog player).
# Same permissions as vt_app.
# User for Vitess VReplication (base vstreamers and vplayer).
CREATE USER 'vt_filtered'@'localhost';
GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, RELOAD, PROCESS, FILE,
REFERENCES, INDEX, ALTER, SHOW DATABASES, CREATE TEMPORARY TABLES,
Expand Down
7 changes: 3 additions & 4 deletions examples/compose/config/init_db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ GRANT GRANT OPTION ON *.* TO 'vt_dba'@'localhost';
CREATE USER 'vt_app'@'localhost';
GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, RELOAD, PROCESS, FILE,
REFERENCES, INDEX, ALTER, SHOW DATABASES, CREATE TEMPORARY TABLES,
LOCK TABLES, EXECUTE, REPLICATION SLAVE, REPLICATION CLIENT, CREATE VIEW,
SHOW VIEW, CREATE ROUTINE, ALTER ROUTINE, CREATE USER, EVENT, TRIGGER
LOCK TABLES, EXECUTE, REPLICATION CLIENT, CREATE VIEW, SHOW VIEW,
CREATE ROUTINE, ALTER ROUTINE, CREATE USER, EVENT, TRIGGER
ON *.* TO 'vt_app'@'localhost';
# User for app debug traffic, with global read access.
CREATE USER 'vt_appdebug'@'localhost';
Expand All @@ -57,8 +57,7 @@ GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, RELOAD, PROCESS, FILE,
# TODO: Should we set a password on this since it allows remote connections?
CREATE USER 'vt_repl'@'%';
GRANT REPLICATION SLAVE ON *.* TO 'vt_repl'@'%';
# User for Vitess filtered replication (binlog player).
# Same permissions as vt_app.
# User for Vitess VReplication (base vstreamers and vplayer).
CREATE USER 'vt_filtered'@'localhost';
GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, RELOAD, PROCESS, FILE,
REFERENCES, INDEX, ALTER, SHOW DATABASES, CREATE TEMPORARY TABLES,
Expand Down
7 changes: 3 additions & 4 deletions examples/operator/101_initial_cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ stringData:
CREATE USER 'vt_app'@'localhost';
GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, RELOAD, PROCESS, FILE,
REFERENCES, INDEX, ALTER, SHOW DATABASES, CREATE TEMPORARY TABLES,
LOCK TABLES, EXECUTE, REPLICATION SLAVE, REPLICATION CLIENT, CREATE VIEW,
SHOW VIEW, CREATE ROUTINE, ALTER ROUTINE, CREATE USER, EVENT, TRIGGER
LOCK TABLES, EXECUTE, REPLICATION CLIENT, CREATE VIEW, SHOW VIEW,
CREATE ROUTINE, ALTER ROUTINE, CREATE USER, EVENT, TRIGGER
ON *.* TO 'vt_app'@'localhost';
# User for app debug traffic, with global read access.
Expand All @@ -168,8 +168,7 @@ stringData:
CREATE USER 'vt_repl'@'%';
GRANT REPLICATION SLAVE ON *.* TO 'vt_repl'@'%';
# User for Vitess filtered replication (binlog player).
# Same permissions as vt_app.
# User for Vitess VReplication (base vstreamers and vplayer).
CREATE USER 'vt_filtered'@'localhost';
GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, RELOAD, PROCESS, FILE,
REFERENCES, INDEX, ALTER, SHOW DATABASES, CREATE TEMPORARY TABLES,
Expand Down
7 changes: 3 additions & 4 deletions examples/operator/vtorc_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ stringData:
CREATE USER 'vt_app'@'localhost';
GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, RELOAD, PROCESS, FILE,
REFERENCES, INDEX, ALTER, SHOW DATABASES, CREATE TEMPORARY TABLES,
LOCK TABLES, EXECUTE, REPLICATION SLAVE, REPLICATION CLIENT, CREATE VIEW,
SHOW VIEW, CREATE ROUTINE, ALTER ROUTINE, CREATE USER, EVENT, TRIGGER
LOCK TABLES, EXECUTE, REPLICATION CLIENT, CREATE VIEW, SHOW VIEW,
CREATE ROUTINE, ALTER ROUTINE, CREATE USER, EVENT, TRIGGER
ON *.* TO 'vt_app'@'localhost';
# User for app debug traffic, with global read access.
Expand All @@ -177,8 +177,7 @@ stringData:
CREATE USER 'vt_repl'@'%';
GRANT REPLICATION SLAVE ON *.* TO 'vt_repl'@'%';
# User for Vitess filtered replication (binlog player).
# Same permissions as vt_app.
# User for Vitess VReplication (base vstreamers and vplayer).
CREATE USER 'vt_filtered'@'localhost';
GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, RELOAD, PROCESS, FILE,
REFERENCES, INDEX, ALTER, SHOW DATABASES, CREATE TEMPORARY TABLES,
Expand Down
7 changes: 3 additions & 4 deletions go/test/endtoend/sharding/initialsharding/sharding_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,8 +646,8 @@ GRANT GRANT OPTION ON *.* TO 'vt_dba'@'127.0.0.1';
CREATE USER 'vt_app'@'127.0.0.1' IDENTIFIED BY 'VtAppPass';
GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, RELOAD, PROCESS, FILE,
REFERENCES, INDEX, ALTER, SHOW DATABASES, CREATE TEMPORARY TABLES,
LOCK TABLES, EXECUTE, REPLICATION SLAVE, REPLICATION CLIENT, CREATE VIEW,
SHOW VIEW, CREATE ROUTINE, ALTER ROUTINE, CREATE USER, EVENT, TRIGGER
LOCK TABLES, EXECUTE, REPLICATION CLIENT, CREATE VIEW, SHOW VIEW,
CREATE ROUTINE, ALTER ROUTINE, CREATE USER, EVENT, TRIGGER
ON *.* TO 'vt_app'@'127.0.0.1';
# User for administrative operations that need to be executed as non-SUPER.
# Same permissions as vt_app here.
Expand All @@ -657,8 +657,7 @@ GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, RELOAD, PROCESS, FILE,
LOCK TABLES, EXECUTE, REPLICATION SLAVE, REPLICATION CLIENT, CREATE VIEW,
SHOW VIEW, CREATE ROUTINE, ALTER ROUTINE, CREATE USER, EVENT, TRIGGER
ON *.* TO 'vt_allprivs'@'127.0.0.1';
# User for Vitess filtered replication (binlog player).
# Same permissions as vt_app.
# User for Vitess VReplication (base vstreamers and vplayer).
CREATE USER 'vt_filtered'@'127.0.0.1' IDENTIFIED BY 'VtFilteredPass';
GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, RELOAD, PROCESS, FILE,
REFERENCES, INDEX, ALTER, SHOW DATABASES, CREATE TEMPORARY TABLES,
Expand Down
3 changes: 2 additions & 1 deletion go/vt/dbconfigs/dbconfigs.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,9 +420,10 @@ func (dbcfgs *DBConfigs) getParams(userKey string, dbc *DBConfigs) (*UserConfig,
}

// SetDbParams sets the dba and app params
func (dbcfgs *DBConfigs) SetDbParams(dbaParams, appParams mysql.ConnParams) {
func (dbcfgs *DBConfigs) SetDbParams(dbaParams, appParams, filteredParams mysql.ConnParams) {
dbcfgs.dbaParams = dbaParams
dbcfgs.appParams = appParams
dbcfgs.filteredParams = filteredParams
}

// NewTestDBConfigs returns a DBConfigs meant for testing.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (tm *TabletManager) getGTIDFromTimestamp(ctx context.Context, pos mysql.Pos
Host: connParams.Host,
Port: connParams.Port,
}
dbCfgs.SetDbParams(*connParams, *connParams)
dbCfgs.SetDbParams(*connParams, *connParams, *connParams)
vsClient := vreplication.NewReplicaConnector(connParams)

filter := &binlogdatapb.Filter{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func NewReplicaConnector(connParams *mysql.ConnParams) *ReplicaConnector {
Host: connParams.Host,
Port: connParams.Port,
}
dbCfg.SetDbParams(*connParams, *connParams)
dbCfg.SetDbParams(*connParams, *connParams, *connParams)
config.DB = dbCfg
c := &ReplicaConnector{conn: connParams}
env := tabletenv.NewEnv(config, "source")
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vttablet/tabletserver/vstreamer/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binl
if !vse.isOpen {
return nil, 0, errors.New("VStreamer is not open")
}
streamer := newUVStreamer(ctx, vse, vse.env.Config().DB.AppWithDB(), vse.se, startPos, tablePKs, filter, vse.lvschema, send)
streamer := newUVStreamer(ctx, vse, vse.env.Config().DB.FilteredWithDB(), vse.se, startPos, tablePKs, filter, vse.lvschema, send)
idx := vse.streamIdx
vse.streamers[idx] = streamer
vse.streamIdx++
Expand Down Expand Up @@ -252,7 +252,7 @@ func (vse *Engine) StreamRows(ctx context.Context, query string, lastpk []sqltyp
return nil, 0, errors.New("VStreamer is not open")
}

rowStreamer := newRowStreamer(ctx, vse.env.Config().DB.AppWithDB(), vse.se, query, lastpk, vse.lvschema, send, vse)
rowStreamer := newRowStreamer(ctx, vse.env.Config().DB.FilteredWithDB(), vse.se, query, lastpk, vse.lvschema, send, vse)
idx := vse.streamIdx
vse.rowStreamers[idx] = rowStreamer
vse.streamIdx++
Expand Down Expand Up @@ -286,7 +286,7 @@ func (vse *Engine) StreamResults(ctx context.Context, query string, send func(*b
if !vse.isOpen {
return nil, 0, errors.New("VStreamer is not open")
}
resultStreamer := newResultStreamer(ctx, vse.env.Config().DB.AppWithDB(), query, send, vse)
resultStreamer := newResultStreamer(ctx, vse.env.Config().DB.FilteredWithDB(), query, send, vse)
idx := vse.streamIdx
vse.resultStreamers[idx] = resultStreamer
vse.streamIdx++
Expand Down

0 comments on commit 93f4490

Please sign in to comment.