From 8f0bc14aafc563c7e23c566170f9b4f81a2db8e0 Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Wed, 3 Jan 2018 11:46:10 -0800 Subject: [PATCH 01/11] add an error log if creating the app connection fails --- go/vt/vttablet/tabletserver/tabletserver.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index f5add2a1eb7..3d3c3572ffc 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -440,6 +440,7 @@ func (tsv *TabletServer) decideAction(tabletType topodatapb.TabletType, serving func (tsv *TabletServer) fullStart() (err error) { c, err := dbconnpool.NewDBConnection(&tsv.dbconfigs.App, tabletenv.MySQLStats) if err != nil { + log.Errorf("error creating db app connection: %v", err) return err } c.Close() From f786a06df288bd096c78677591afff2708a4234b Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Mon, 8 Jan 2018 09:25:45 -0800 Subject: [PATCH 02/11] add a reusable mysqlproxy module Modeling after the vtgate mysql server implementation, add a simple module that implements the mysql server protocol handlers and proxies all requests to the appropriate methods in a queryservice backend. This includes optional support for query normalization. --- go/vt/mysqlproxy/mysqlproxy.go | 156 +++++++++++++++++++++++++++++++++ 1 file changed, 156 insertions(+) create mode 100644 go/vt/mysqlproxy/mysqlproxy.go diff --git a/go/vt/mysqlproxy/mysqlproxy.go b/go/vt/mysqlproxy/mysqlproxy.go new file mode 100644 index 00000000000..9503b572c6a --- /dev/null +++ b/go/vt/mysqlproxy/mysqlproxy.go @@ -0,0 +1,156 @@ +/* +Copyright 2017 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package mysqlproxy is a basic module that proxies a mysql server +// session to appropriate calls in a queryservice back end, with optional +// query normalization. +package mysqlproxy + +import ( + "context" + "fmt" + + log "github.com/golang/glog" + + "github.com/youtube/vitess/go/sqltypes" + "github.com/youtube/vitess/go/vt/sqlparser" + "github.com/youtube/vitess/go/vt/vttablet/queryservice" + + querypb "github.com/youtube/vitess/go/vt/proto/query" +) + +// ProxySession holds session state for the proxy +type ProxySession struct { + TransactionID int64 + TargetString string + Options *querypb.ExecuteOptions + Autocommit bool +} + +// Proxy wraps the standalone query service +type Proxy struct { + target *querypb.Target + qs queryservice.QueryService + normalize bool +} + +// NewProxy creates a new proxy +func NewProxy(target *querypb.Target, qs queryservice.QueryService, normalize bool) *Proxy { + return &Proxy{ + target: target, + qs: qs, + normalize: normalize, + } +} + +// Execute runs the given sql query in the specified session +func (mp *Proxy) Execute(ctx context.Context, session *ProxySession, sql string, bindVariables map[string]*querypb.BindVariable) (*ProxySession, *sqltypes.Result, error) { + var err error + result := &sqltypes.Result{} + + switch sqlparser.Preview(sql) { + case sqlparser.StmtBegin: + err = mp.doBegin(ctx, session) + case sqlparser.StmtCommit: + err = mp.doCommit(ctx, session) + case sqlparser.StmtRollback: + err = mp.doRollback(ctx, session) + case sqlparser.StmtSet: + result, err = mp.doSet(ctx, session, sql, bindVariables) + default: + result, err = mp.doExecute(ctx, session, sql, bindVariables) + } + + if err != nil { + return nil, nil, err + } + + return session, result, nil +} + +// Rollback rolls back the session +func (mp *Proxy) Rollback(ctx context.Context, session *ProxySession) error { + return mp.doRollback(ctx, session) +} + +func (mp *Proxy) doBegin(ctx context.Context, session *ProxySession) error { + txID, err := mp.qs.Begin(ctx, mp.target, session.Options) + if err != nil { + return err + } + session.TransactionID = txID + return nil +} + +func (mp *Proxy) doCommit(ctx context.Context, session *ProxySession) error { + if session.TransactionID == 0 { + return fmt.Errorf("commit: no open transaction") + + } + err := mp.qs.Commit(ctx, mp.target, session.TransactionID) + session.TransactionID = 0 + return err +} + +// Rollback rolls back the session +func (mp *Proxy) doRollback(ctx context.Context, session *ProxySession) error { + if session.TransactionID != 0 { + err := mp.qs.Rollback(ctx, mp.target, session.TransactionID) + session.TransactionID = 0 + return err + } + return nil +} + +// Set is currently ignored +func (mp *Proxy) doSet(ctx context.Context, session *ProxySession, sql string, bindVariables map[string]*querypb.BindVariable) (*sqltypes.Result, error) { + vals, charset, err := sqlparser.ExtractSetValues(sql) + if err != nil { + return nil, err + } + if len(vals) > 0 && charset != "" { + return nil, err + } + + switch charset { + case "", "utf8", "utf8mb4", "latin1", "default": + break + default: + return nil, fmt.Errorf("unexpected value for charset: %v", charset) + } + + for k, v := range vals { + log.Warningf("Ignored inapplicable SET %v = %v", k, v) + } + + return &sqltypes.Result{}, nil +} + +// doExecute runs the given query +func (mp *Proxy) doExecute(ctx context.Context, session *ProxySession, sql string, bindVariables map[string]*querypb.BindVariable) (*sqltypes.Result, error) { + if mp.normalize { + query, comments := sqlparser.SplitTrailingComments(sql) + stmt, err := sqlparser.Parse(query) + if err != nil { + return nil, err + } + sqlparser.Normalize(stmt, bindVariables, "vtp") + normalized := sqlparser.String(stmt) + sql = normalized + comments + } + + return mp.qs.Execute(ctx, mp.target, sql, bindVariables, session.TransactionID, session.Options) +} From 5e7254ee614dac092de3c319fffb0d577ce639e4 Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Mon, 8 Jan 2018 13:19:08 -0800 Subject: [PATCH 03/11] add vtqueryserver Add a binary to wrap a standalone queryservice with a grpc service interface and a mysql server implementation using the mysqlproxy module to direct to the queryservice. --- go/cmd/vtqueryserver/index.go | 31 +++ go/cmd/vtqueryserver/plugin_auth_static.go | 28 +++ .../vtqueryserver/plugin_grpcqueryservice.go | 34 +++ go/cmd/vtqueryserver/vtqueryserver.go | 71 ++++++ go/vt/vtqueryserver/plugin_mysql_server.go | 236 ++++++++++++++++++ .../vtqueryserver/plugin_mysql_server_test.go | 157 ++++++++++++ go/vt/vtqueryserver/status.go | 90 +++++++ go/vt/vtqueryserver/vtqueryserver.go | 78 ++++++ 8 files changed, 725 insertions(+) create mode 100644 go/cmd/vtqueryserver/index.go create mode 100644 go/cmd/vtqueryserver/plugin_auth_static.go create mode 100644 go/cmd/vtqueryserver/plugin_grpcqueryservice.go create mode 100644 go/cmd/vtqueryserver/vtqueryserver.go create mode 100644 go/vt/vtqueryserver/plugin_mysql_server.go create mode 100644 go/vt/vtqueryserver/plugin_mysql_server_test.go create mode 100644 go/vt/vtqueryserver/status.go create mode 100644 go/vt/vtqueryserver/vtqueryserver.go diff --git a/go/cmd/vtqueryserver/index.go b/go/cmd/vtqueryserver/index.go new file mode 100644 index 00000000000..72b2637abf0 --- /dev/null +++ b/go/cmd/vtqueryserver/index.go @@ -0,0 +1,31 @@ +/* +Copyright 2017 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "net/http" +) + +// This is a separate file so it can be selectively included/excluded from +// builds to opt in/out of the redirect. + +func init() { + // Anything unrecognized gets redirected to the status page. + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + http.Redirect(w, r, "/debug/status", http.StatusFound) + }) +} diff --git a/go/cmd/vtqueryserver/plugin_auth_static.go b/go/cmd/vtqueryserver/plugin_auth_static.go new file mode 100644 index 00000000000..f95882338c7 --- /dev/null +++ b/go/cmd/vtqueryserver/plugin_auth_static.go @@ -0,0 +1,28 @@ +/* +Copyright 2017 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreedto in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +// This plugin imports staticauthserver to register the flat-file implementation of AuthServer. + +import ( + "github.com/youtube/vitess/go/mysql" + "github.com/youtube/vitess/go/vt/vtqueryserver" +) + +func init() { + vtqueryserver.RegisterPluginInitializer(func() { mysql.InitAuthServerStatic() }) +} diff --git a/go/cmd/vtqueryserver/plugin_grpcqueryservice.go b/go/cmd/vtqueryserver/plugin_grpcqueryservice.go new file mode 100644 index 00000000000..0580c5310b3 --- /dev/null +++ b/go/cmd/vtqueryserver/plugin_grpcqueryservice.go @@ -0,0 +1,34 @@ +/* +Copyright 2017 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "github.com/youtube/vitess/go/vt/servenv" + "github.com/youtube/vitess/go/vt/vttablet/grpcqueryservice" + "github.com/youtube/vitess/go/vt/vttablet/tabletserver" +) + +// Imports and register the gRPC queryservice server + +func init() { + tabletserver.RegisterFunctions = append(tabletserver.RegisterFunctions, func(qsc tabletserver.Controller) { + if servenv.GRPCCheckServiceMap("queryservice") { + grpcqueryservice.Register(servenv.GRPCServer, qsc.QueryService()) + } + }) + +} diff --git a/go/cmd/vtqueryserver/vtqueryserver.go b/go/cmd/vtqueryserver/vtqueryserver.go new file mode 100644 index 00000000000..eed1ae3a77d --- /dev/null +++ b/go/cmd/vtqueryserver/vtqueryserver.go @@ -0,0 +1,71 @@ +/* +Copyright 2017 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "flag" + "os" + + log "github.com/golang/glog" + "github.com/youtube/vitess/go/vt/dbconfigs" + "github.com/youtube/vitess/go/vt/servenv" + "github.com/youtube/vitess/go/vt/vtqueryserver" + "github.com/youtube/vitess/go/vt/vttablet/tabletserver/tabletenv" +) + +var ( + mysqlSocketFile = flag.String("mysql-socket-file", "", "path to unix socket file to connect to mysql") +) + +func init() { + servenv.RegisterDefaultFlags() +} + +func main() { + dbconfigFlags := dbconfigs.AppConfig | dbconfigs.AppDebugConfig + dbconfigs.RegisterFlags(dbconfigFlags) + flag.Parse() + + if *servenv.Version { + servenv.AppVersion.Print() + os.Exit(0) + } + + if len(flag.Args()) > 0 { + flag.Usage() + log.Exit("vtqueryserver doesn't take any positional arguments") + } + if err := tabletenv.VerifyConfig(); err != nil { + log.Exitf("invalid config: %v", err) + } + + tabletenv.Init() + + servenv.Init() + + dbcfgs, err := dbconfigs.Init(*mysqlSocketFile, dbconfigFlags) + if err != nil { + log.Fatal(err) + } + + err = vtqueryserver.Init(dbcfgs) + if err != nil { + log.Exitf("error initializing proxy: %v", err) + } + + servenv.RunDefault() +} diff --git a/go/vt/vtqueryserver/plugin_mysql_server.go b/go/vt/vtqueryserver/plugin_mysql_server.go new file mode 100644 index 00000000000..41d39af98ab --- /dev/null +++ b/go/vt/vtqueryserver/plugin_mysql_server.go @@ -0,0 +1,236 @@ +/* +Copyright 2017 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreedto in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vtqueryserver + +import ( + "flag" + "fmt" + "net" + "os" + "syscall" + + log "github.com/golang/glog" + "golang.org/x/net/context" + + "github.com/youtube/vitess/go/mysql" + "github.com/youtube/vitess/go/sqltypes" + "github.com/youtube/vitess/go/vt/callerid" + "github.com/youtube/vitess/go/vt/mysqlproxy" + "github.com/youtube/vitess/go/vt/servenv" + "github.com/youtube/vitess/go/vt/vttls" + + querypb "github.com/youtube/vitess/go/vt/proto/query" +) + +var ( + mysqlServerPort = flag.Int("mysqlproxy_server_port", -1, "If set, also listen for MySQL binary protocol connections on this port.") + mysqlServerBindAddress = flag.String("mysqlproxy_server_bind_address", "", "Binds on this address when listening to MySQL binary protocol. Useful to restrict listening to 'localhost' only for instance.") + mysqlServerSocketPath = flag.String("mysqlproxy_server_socket_path", "", "This option specifies the Unix socket file to use when listening for local connections. By default it will be empty and it won't listen to a unix socket") + mysqlAuthServerImpl = flag.String("mysql_auth_server_impl", "static", "Which auth server implementation to use.") + mysqlAllowClearTextWithoutTLS = flag.Bool("mysql_allow_clear_text_without_tls", false, "If set, the server will allow the use of a clear text password over non-SSL connections.") + + mysqlSslCert = flag.String("mysqlproxy_server_ssl_cert", "", "Path to the ssl cert for mysql server plugin SSL") + mysqlSslKey = flag.String("mysqlproxy_server_ssl_key", "", "Path to ssl key for mysql server plugin SSL") + mysqlSslCa = flag.String("mysqlproxy_server_ssl_ca", "", "Path to ssl CA for mysql server plugin SSL. If specified, server will require and validate client certs.") + + mysqlSlowConnectWarnThreshold = flag.Duration("mysqlproxy_slow_connect_warn_threshold", 0, "Warn if it takes more than the given threshold for a mysql connection to establish") +) + +// proxyHandler implements the Listener interface. +// It stores the Session in the ClientData of a Connection, if a transaction +// is in progress. +type proxyHandler struct { + mp *mysqlproxy.Proxy +} + +func newProxyHandler(mp *mysqlproxy.Proxy) *proxyHandler { + return &proxyHandler{ + mp: mp, + } +} + +func (mh *proxyHandler) NewConnection(c *mysql.Conn) { +} + +func (mh *proxyHandler) ConnectionClosed(c *mysql.Conn) { + // Rollback if there is an ongoing transaction. Ignore error. + ctx := context.Background() + session, _ := c.ClientData.(*mysqlproxy.ProxySession) + if session != nil && session.TransactionID != 0 { + _ = mh.mp.Rollback(ctx, session) + } +} + +func (mh *proxyHandler) ComQuery(c *mysql.Conn, query string, callback func(*sqltypes.Result) error) error { + // FIXME(alainjobart): Add some kind of timeout to the context. + ctx := context.Background() + + // Fill in the ImmediateCallerID with the UserData returned by + // the AuthServer plugin for that user. If nothing was + // returned, use the User. This lets the plugin map a MySQL + // user used for authentication to a Vitess User used for + // Table ACLs and Vitess authentication in general. + im := c.UserData.Get() + ef := callerid.NewEffectiveCallerID( + c.User, /* principal: who */ + c.RemoteAddr().String(), /* component: running client process */ + "mysqlproxy MySQL Connector" /* subcomponent: part of the client */) + ctx = callerid.NewContext(ctx, ef, im) + + session, _ := c.ClientData.(*mysqlproxy.ProxySession) + if session == nil { + session = &mysqlproxy.ProxySession{ + Options: &querypb.ExecuteOptions{ + IncludedFields: querypb.ExecuteOptions_ALL, + }, + Autocommit: true, + } + if c.Capabilities&mysql.CapabilityClientFoundRows != 0 { + session.Options.ClientFoundRows = true + } + } + if c.SchemaName != "" { + session.TargetString = c.SchemaName + } + session, result, err := mh.mp.Execute(ctx, session, query, make(map[string]*querypb.BindVariable)) + c.ClientData = session + err = mysql.NewSQLErrorFromError(err) + if err != nil { + return err + } + + return callback(result) +} + +var mysqlListener *mysql.Listener +var mysqlUnixListener *mysql.Listener + +// initiMySQLProtocol starts the mysql protocol. +// It should be called only once in a process. +func initMySQLProtocol() { + log.Infof("initializing mysql protocol") + + // Flag is not set, just return. + if *mysqlServerPort < 0 && *mysqlServerSocketPath == "" { + return + } + + // If no mysqlproxy was created, just return. + if mysqlProxy == nil { + log.Fatalf("mysqlProxy not initialized") + return + } + + // Initialize registered AuthServer implementations (or other plugins) + for _, initFn := range pluginInitializers { + initFn() + } + authServer := mysql.GetAuthServer(*mysqlAuthServerImpl) + + // Create a Listener. + var err error + mh := newProxyHandler(mysqlProxy) + if *mysqlServerPort >= 0 { + mysqlListener, err = mysql.NewListener("tcp", net.JoinHostPort(*mysqlServerBindAddress, fmt.Sprintf("%v", *mysqlServerPort)), authServer, mh) + if err != nil { + log.Exitf("mysql.NewListener failed: %v", err) + } + if *mysqlSslCert != "" && *mysqlSslKey != "" { + mysqlListener.TLSConfig, err = vttls.ServerConfig(*mysqlSslCert, *mysqlSslKey, *mysqlSslCa) + if err != nil { + log.Exitf("grpcutils.TLSServerConfig failed: %v", err) + return + } + } + mysqlListener.AllowClearTextWithoutTLS = *mysqlAllowClearTextWithoutTLS + + // Check for the connection threshold + if *mysqlSlowConnectWarnThreshold != 0 { + log.Infof("setting mysql slow connection threshold to %v", mysqlSlowConnectWarnThreshold) + mysqlListener.SlowConnectWarnThreshold = *mysqlSlowConnectWarnThreshold + } + // Start listening for tcp + go mysqlListener.Accept() + } + + if *mysqlServerSocketPath != "" { + // Let's create this unix socket with permissions to all users. In this way, + // clients can connect to mysqlproxy mysql server without being mysqlproxy user + oldMask := syscall.Umask(000) + mysqlUnixListener, err = newMysqlUnixSocket(*mysqlServerSocketPath, authServer, mh) + _ = syscall.Umask(oldMask) + if err != nil { + log.Exitf("mysql.NewListener failed: %v", err) + return + } + // Listen for unix socket + go mysqlUnixListener.Accept() + } +} + +// newMysqlUnixSocket creates a new unix socket mysql listener. If a socket file already exists, attempts +// to clean it up. +func newMysqlUnixSocket(address string, authServer mysql.AuthServer, handler mysql.Handler) (*mysql.Listener, error) { + listener, err := mysql.NewListener("unix", address, authServer, handler) + switch err := err.(type) { + case nil: + return listener, nil + case *net.OpError: + log.Warningf("Found existent socket when trying to create new unix mysql listener: %s, attempting to clean up", address) + // err.Op should never be different from listen, just being extra careful + // in case in the future other errors are returned here + if err.Op != "listen" { + return nil, err + } + _, dialErr := net.Dial("unix", address) + if dialErr == nil { + log.Errorf("Existent socket '%s' is still accepting connections, aborting", address) + return nil, err + } + removeFileErr := os.Remove(address) + if removeFileErr != nil { + log.Errorf("Couldn't remove existent socket file: %s", address) + return nil, err + } + listener, listenerErr := mysql.NewListener("unix", address, authServer, handler) + return listener, listenerErr + default: + return nil, err + } +} + +func init() { + servenv.OnRun(initMySQLProtocol) + + servenv.OnTerm(func() { + if mysqlListener != nil { + mysqlListener.Close() + mysqlListener = nil + } + if mysqlUnixListener != nil { + mysqlUnixListener.Close() + mysqlUnixListener = nil + } + }) +} + +var pluginInitializers []func() + +// RegisterPluginInitializer lets plugins register themselves to be init'ed at servenv.OnRun-time +func RegisterPluginInitializer(initializer func()) { + pluginInitializers = append(pluginInitializers, initializer) +} diff --git a/go/vt/vtqueryserver/plugin_mysql_server_test.go b/go/vt/vtqueryserver/plugin_mysql_server_test.go new file mode 100644 index 00000000000..487aa51c36a --- /dev/null +++ b/go/vt/vtqueryserver/plugin_mysql_server_test.go @@ -0,0 +1,157 @@ +/* +Copyright 2017 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreedto in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vtqueryserver + +import ( + "io/ioutil" + "os" + "strings" + "testing" + + "golang.org/x/net/context" + + "github.com/youtube/vitess/go/mysql" + "github.com/youtube/vitess/go/sqltypes" +) + +type testHandler struct { + lastConn *mysql.Conn +} + +func (th *testHandler) NewConnection(c *mysql.Conn) { + th.lastConn = c +} + +func (th *testHandler) ConnectionClosed(c *mysql.Conn) { +} + +func (th *testHandler) ComQuery(c *mysql.Conn, q string, callback func(*sqltypes.Result) error) error { + return nil +} + +func TestConnectionUnixSocket(t *testing.T) { + th := &testHandler{} + + authServer := mysql.NewAuthServerStatic() + + authServer.Entries["user1"] = []*mysql.AuthServerStaticEntry{ + { + Password: "password1", + UserData: "userData1", + SourceHost: "localhost", + }, + } + + // Use tmp file to reserve a path, remove it immediately, we only care about + // name in this context + unixSocket, err := ioutil.TempFile("", "mysql_vitess_test.sock") + if err != nil { + t.Fatalf("Failed to create temp file") + } + os.Remove(unixSocket.Name()) + + l, err := newMysqlUnixSocket(unixSocket.Name(), authServer, th) + if err != nil { + t.Fatalf("NewUnixSocket failed: %v", err) + } + defer l.Close() + go l.Accept() + + params := &mysql.ConnParams{ + UnixSocket: unixSocket.Name(), + Uname: "user1", + Pass: "password1", + } + + c, err := mysql.Connect(context.Background(), params) + if err != nil { + t.Errorf("Should be able to connect to server but found error: %v", err) + } + c.Close() +} + +func TestConnectionStaleUnixSocket(t *testing.T) { + th := &testHandler{} + + authServer := mysql.NewAuthServerStatic() + + authServer.Entries["user1"] = []*mysql.AuthServerStaticEntry{ + { + Password: "password1", + UserData: "userData1", + SourceHost: "localhost", + }, + } + + // First let's create a file. In this way, we simulate + // having a stale socket on disk that needs to be cleaned up. + unixSocket, err := ioutil.TempFile("", "mysql_vitess_test.sock") + if err != nil { + t.Fatalf("Failed to create temp file") + } + + l, err := newMysqlUnixSocket(unixSocket.Name(), authServer, th) + if err != nil { + t.Fatalf("NewListener failed: %v", err) + } + defer l.Close() + go l.Accept() + + params := &mysql.ConnParams{ + UnixSocket: unixSocket.Name(), + Uname: "user1", + Pass: "password1", + } + + c, err := mysql.Connect(context.Background(), params) + if err != nil { + t.Errorf("Should be able to connect to server but found error: %v", err) + } + c.Close() +} + +func TestConnectionRespectsExistingUnixSocket(t *testing.T) { + th := &testHandler{} + + authServer := mysql.NewAuthServerStatic() + + authServer.Entries["user1"] = []*mysql.AuthServerStaticEntry{ + { + Password: "password1", + UserData: "userData1", + SourceHost: "localhost", + }, + } + + unixSocket, err := ioutil.TempFile("", "mysql_vitess_test.sock") + if err != nil { + t.Fatalf("Failed to create temp file") + } + os.Remove(unixSocket.Name()) + + l, err := newMysqlUnixSocket(unixSocket.Name(), authServer, th) + if err != nil { + t.Errorf("NewListener failed: %v", err) + } + defer l.Close() + go l.Accept() + _, err = newMysqlUnixSocket(unixSocket.Name(), authServer, th) + want := "listen unix" + if err == nil || !strings.HasPrefix(err.Error(), want) { + t.Errorf("Error: %v, want prefix %s", err, want) + } +} diff --git a/go/vt/vtqueryserver/status.go b/go/vt/vtqueryserver/status.go new file mode 100644 index 00000000000..04606919d9e --- /dev/null +++ b/go/vt/vtqueryserver/status.go @@ -0,0 +1,90 @@ +/* +Copyright 2017 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreedto in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vtqueryserver + +import ( + "github.com/youtube/vitess/go/vt/servenv" + "github.com/youtube/vitess/go/vt/vttablet/tabletserver" +) + +var ( + // proxyTemplate contains the style sheet and the tablet itself. + proxyTemplate = ` + + + + + + + + +
+ Target Keyspace: {{.Target.Keyspace}}
+
+ Schema
+ Schema Query Plans
+ Schema Query Stats
+ Schema Table Stats
+
+ Query Stats
+ Streaming Query Stats
+ Consolidations
+ Current Query Log
+ Current Transaction Log
+ In-flight 2PC Transactions
+
+ Query Service Health Check
+ Current Stream Queries
+
+` +) + +// For use by plugins which wish to avoid racing when registering status page parts. +var onStatusRegistered func() + +func addStatusParts(qsc tabletserver.Controller) { + servenv.AddStatusPart("Target", proxyTemplate, func() interface{} { + return map[string]interface{}{ + "Target": target, + } + }) + qsc.AddStatusPart() + if onStatusRegistered != nil { + onStatusRegistered() + } +} diff --git a/go/vt/vtqueryserver/vtqueryserver.go b/go/vt/vtqueryserver/vtqueryserver.go new file mode 100644 index 00000000000..71d8a708b4b --- /dev/null +++ b/go/vt/vtqueryserver/vtqueryserver.go @@ -0,0 +1,78 @@ +/* +Copyright 2017 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package vtqueryserver is a standalone version of the tablet server that +// only implements the queryservice interface without any of the topology, +// replication management, or other features of the full vttablet. +package vtqueryserver + +import ( + "flag" + + log "github.com/golang/glog" + + "github.com/youtube/vitess/go/vt/dbconfigs" + "github.com/youtube/vitess/go/vt/mysqlproxy" + "github.com/youtube/vitess/go/vt/servenv" + "github.com/youtube/vitess/go/vt/vttablet/tabletserver" + "github.com/youtube/vitess/go/vt/vttablet/tabletserver/tabletenv" + + querypb "github.com/youtube/vitess/go/vt/proto/query" + topodatapb "github.com/youtube/vitess/go/vt/proto/topodata" +) + +var ( + mysqlProxy *mysqlproxy.Proxy + target = querypb.Target{ + TabletType: topodatapb.TabletType_MASTER, + Keyspace: "", + } + + targetKeyspace = flag.String("target", "", "Target database name") + normalizeQueries = flag.Bool("normalize_queries", true, "Rewrite queries with bind vars. Turn this off if the app itself sends normalized queries with bind vars.") +) + +// Init initializes the proxy +func Init(dbcfgs *dbconfigs.DBConfigs) error { + target.Keyspace = *targetKeyspace + log.Infof("initalizing vtqueryserver.Proxy for target %s", target.Keyspace) + + // force autocommit to be enabled + tabletenv.Config.EnableAutoCommit = true + + // creates and registers the query service + qs := tabletserver.NewTabletServerWithNilTopoServer(tabletenv.Config) + + mysqlProxy = mysqlproxy.NewProxy(&target, qs, *normalizeQueries) + + servenv.OnRun(func() { + qs.Register() + addStatusParts(qs) + }) + + servenv.OnClose(func() { + // We now leave the queryservice running during lameduck, + // so stop it in OnClose(), after lameduck is over. + qs.StopService() + }) + + err := qs.StartService(target, *dbcfgs) + if err != nil { + return err + } + + return nil +} From bb33739fa25259144da4aa3fabff90ec6af8cc29 Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Tue, 23 Jan 2018 17:55:23 -0800 Subject: [PATCH 04/11] resolve upstream conflict --- go/vt/mysqlproxy/mysqlproxy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/mysqlproxy/mysqlproxy.go b/go/vt/mysqlproxy/mysqlproxy.go index 9503b572c6a..ff9d1c7da96 100644 --- a/go/vt/mysqlproxy/mysqlproxy.go +++ b/go/vt/mysqlproxy/mysqlproxy.go @@ -117,7 +117,7 @@ func (mp *Proxy) doRollback(ctx context.Context, session *ProxySession) error { // Set is currently ignored func (mp *Proxy) doSet(ctx context.Context, session *ProxySession, sql string, bindVariables map[string]*querypb.BindVariable) (*sqltypes.Result, error) { - vals, charset, err := sqlparser.ExtractSetValues(sql) + vals, charset, _, err := sqlparser.ExtractSetValues(sql) if err != nil { return nil, err } From e1fdd3e969ae5165a8ce5187388f08d09f15bd15 Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Fri, 26 Jan 2018 12:20:34 -0800 Subject: [PATCH 05/11] refactor vtqueryserver startup to enable testing --- go/vt/vtqueryserver/plugin_mysql_server.go | 26 +++++++++++++--------- go/vt/vtqueryserver/vtqueryserver.go | 24 +++++++++++++------- 2 files changed, 31 insertions(+), 19 deletions(-) diff --git a/go/vt/vtqueryserver/plugin_mysql_server.go b/go/vt/vtqueryserver/plugin_mysql_server.go index 41d39af98ab..aa183989e43 100644 --- a/go/vt/vtqueryserver/plugin_mysql_server.go +++ b/go/vt/vtqueryserver/plugin_mysql_server.go @@ -165,6 +165,7 @@ func initMySQLProtocol() { } // Start listening for tcp go mysqlListener.Accept() + log.Infof("listening on %s:%d", *mysqlServerBindAddress, *mysqlServerPort) } if *mysqlServerSocketPath != "" { @@ -213,19 +214,22 @@ func newMysqlUnixSocket(address string, authServer mysql.AuthServer, handler mys } } +func shutdownMySQLProtocol() { + log.Infof("shutting down mysql protocol") + if mysqlListener != nil { + mysqlListener.Close() + mysqlListener = nil + } + + if mysqlUnixListener != nil { + mysqlUnixListener.Close() + mysqlUnixListener = nil + } +} + func init() { servenv.OnRun(initMySQLProtocol) - - servenv.OnTerm(func() { - if mysqlListener != nil { - mysqlListener.Close() - mysqlListener = nil - } - if mysqlUnixListener != nil { - mysqlUnixListener.Close() - mysqlUnixListener = nil - } - }) + servenv.OnTerm(shutdownMySQLProtocol) } var pluginInitializers []func() diff --git a/go/vt/vtqueryserver/vtqueryserver.go b/go/vt/vtqueryserver/vtqueryserver.go index 71d8a708b4b..8ac90a4b931 100644 --- a/go/vt/vtqueryserver/vtqueryserver.go +++ b/go/vt/vtqueryserver/vtqueryserver.go @@ -45,8 +45,7 @@ var ( normalizeQueries = flag.Bool("normalize_queries", true, "Rewrite queries with bind vars. Turn this off if the app itself sends normalized queries with bind vars.") ) -// Init initializes the proxy -func Init(dbcfgs *dbconfigs.DBConfigs) error { +func initProxy(dbcfgs *dbconfigs.DBConfigs) (*tabletserver.TabletServer, error) { target.Keyspace = *targetKeyspace log.Infof("initalizing vtqueryserver.Proxy for target %s", target.Keyspace) @@ -55,9 +54,23 @@ func Init(dbcfgs *dbconfigs.DBConfigs) error { // creates and registers the query service qs := tabletserver.NewTabletServerWithNilTopoServer(tabletenv.Config) - mysqlProxy = mysqlproxy.NewProxy(&target, qs, *normalizeQueries) + err := qs.StartService(target, *dbcfgs) + if err != nil { + return nil, err + } + + return qs, nil +} + +// Init initializes the proxy +func Init(dbcfgs *dbconfigs.DBConfigs) error { + qs, err := initProxy(dbcfgs) + if err != nil { + return err + } + servenv.OnRun(func() { qs.Register() addStatusParts(qs) @@ -69,10 +82,5 @@ func Init(dbcfgs *dbconfigs.DBConfigs) error { qs.StopService() }) - err := qs.StartService(target, *dbcfgs) - if err != nil { - return err - } - return nil } From a75a754e345eff0c2668061c0b1f04a0bdf1556b Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Fri, 26 Jan 2018 12:20:48 -0800 Subject: [PATCH 06/11] add initial end to end test for vtqueryserver --- go/vt/vtqueryserver/endtoend_test.go | 213 +++++++++++++++++++++++++++ 1 file changed, 213 insertions(+) create mode 100644 go/vt/vtqueryserver/endtoend_test.go diff --git a/go/vt/vtqueryserver/endtoend_test.go b/go/vt/vtqueryserver/endtoend_test.go new file mode 100644 index 00000000000..263d7ff3d1e --- /dev/null +++ b/go/vt/vtqueryserver/endtoend_test.go @@ -0,0 +1,213 @@ +/* +Copyright 2017 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package vtqueryserver + +import ( + "context" + "flag" + "fmt" + "os" + "path" + "strings" + "testing" + + "github.com/youtube/vitess/go/mysql" + "github.com/youtube/vitess/go/vt/dbconfigs" + "github.com/youtube/vitess/go/vt/vttablet/tabletserver/tabletenv" + "github.com/youtube/vitess/go/vt/vttest" + + vttestpb "github.com/youtube/vitess/go/vt/proto/vttest" +) + +var ( + mysqlConnParams mysql.ConnParams + proxyConnParams mysql.ConnParams +) + +func TestMain(m *testing.M) { + flag.Parse() // Do not remove this comment, import into google3 depends on it + tabletenv.Init() + + exitCode := func() int { + // Launch MySQL. + // We need a Keyspace in the topology, so the DbName is set. + // We need a Shard too, so the database 'vttest' is created. + cfg := vttest.Config{ + Topology: &vttestpb.VTTestTopology{ + Keyspaces: []*vttestpb.Keyspace{ + { + Name: "vttest", + Shards: []*vttestpb.Shard{ + { + Name: "0", + DbNameOverride: "vttest", + }, + }, + }, + }, + }, + OnlyMySQL: true, + } + if err := cfg.InitSchemas("vttest", testSchema, nil); err != nil { + fmt.Fprintf(os.Stderr, "InitSchemas failed: %v\n", err) + return 1 + } + defer os.RemoveAll(cfg.SchemaDir) + cluster := vttest.LocalCluster{ + Config: cfg, + } + if err := cluster.Setup(); err != nil { + fmt.Fprintf(os.Stderr, "could not launch mysql: %v\n", err) + return 1 + } + defer cluster.TearDown() + + mysqlConnParams = cluster.MySQLConnParams() + + proxySock := path.Join(cluster.Env.Directory(), "mysqlproxy.sock") + + proxyConnParams.UnixSocket = proxySock + proxyConnParams.Uname = "proxy" + proxyConnParams.Pass = "letmein" + + *mysqlServerSocketPath = proxyConnParams.UnixSocket + *mysqlAuthServerImpl = "none" + + dbcfgs := dbconfigs.DBConfigs{ + App: mysqlConnParams, + } + qs, err := initProxy(&dbcfgs) + if err != nil { + fmt.Fprintf(os.Stderr, "could not start proxy: %v\n", err) + return 1 + } + defer qs.StopService() + + initMySQLProtocol() + defer shutdownMySQLProtocol() + + return m.Run() + }() + os.Exit(exitCode) +} + +var testSchema = ` +create table test(id int, val varchar(256), primary key(id)); +create table valtest(intval int default 0, floatval float default null, charval varchar(256) default null, binval varbinary(256) default null, primary key(intval)); +` + +func testFetch(t *testing.T, conn *mysql.Conn, sql string, expectedRows int) { + t.Helper() + + result, err := conn.ExecuteFetch(sql, 1000, true) + if err != nil { + t.Fatalf("error: %v", err) + } + + if len(result.Rows) != expectedRows { + t.Fatalf("expected %d rows but got %d", expectedRows, len(result.Rows)) + } +} + +func testDML(t *testing.T, conn *mysql.Conn, sql string, expectedRows int) { + t.Helper() + + result, err := conn.ExecuteFetch(sql, 1000, true) + if err != nil { + t.Fatalf("error: %v", err) + } + + if int(result.RowsAffected) != expectedRows { + t.Fatalf("expected %d rows affected but got %d", expectedRows, result.RowsAffected) + } +} + +func TestQueries(t *testing.T) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &proxyConnParams) + if err != nil { + t.Fatal(err) + } + + // Try a simple query case. + testFetch(t, conn, "select * from test", 0) + + // Try a simple error case. + _, err = conn.ExecuteFetch("select * from aa", 1000, true) + if err == nil || !strings.Contains(err.Error(), "table aa not found in schema") { + t.Fatalf("expected error but got: %v", err) + } +} + +func TestAutocommitDMLs(t *testing.T) { + ctx := context.Background() + + conn, err := mysql.Connect(ctx, &proxyConnParams) + if err != nil { + t.Fatal(err) + } + + conn2, err := mysql.Connect(ctx, &proxyConnParams) + if err != nil { + t.Fatal(err) + } + + testDML(t, conn, "insert into test (id, val) values(1, 'hello')", 1) + + testFetch(t, conn, "select * from test", 1) + testFetch(t, conn2, "select * from test", 1) + + testDML(t, conn, "delete from test", 1) + + testFetch(t, conn, "select * from test", 0) + testFetch(t, conn2, "select * from test", 0) +} + +func TestTransactions(t *testing.T) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &proxyConnParams) + if err != nil { + t.Fatal(err) + } + conn2, err := mysql.Connect(ctx, &proxyConnParams) + if err != nil { + t.Fatal(err) + } + + testDML(t, conn, "begin", 0) + testDML(t, conn, "insert into test (id, val) values(1, 'hello')", 1) + testFetch(t, conn, "select * from test", 1) + testFetch(t, conn2, "select * from test", 0) + testDML(t, conn, "commit", 0) + testFetch(t, conn, "select * from test", 1) + testFetch(t, conn2, "select * from test", 1) + + testDML(t, conn, "begin", 0) + testDML(t, conn, "delete from test", 1) + testFetch(t, conn, "select * from test", 0) + testFetch(t, conn2, "select * from test", 1) + testDML(t, conn, "rollback", 0) + + testFetch(t, conn, "select * from test", 1) + testFetch(t, conn2, "select * from test", 1) + + testDML(t, conn2, "begin", 0) + testDML(t, conn2, "delete from test", 1) + testDML(t, conn2, "commit", 0) + + testFetch(t, conn, "select * from test", 0) + testFetch(t, conn2, "select * from test", 0) +} From 45ad9265220c2bda979b3c3d37d8cb9d09632273 Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Sat, 27 Jan 2018 08:29:52 -0800 Subject: [PATCH 07/11] rework the proxy implementation of autocommit to match mysql Following the example of vtgate, add support to set autocommit on or off in the session, update the execution path to use ExecuteBatch for single round-trip autocommit semantics, and add tests to ensure the behavior matches mysql. --- go/vt/mysqlproxy/mysqlproxy.go | 70 ++++++++++++++++++++++++++-- go/vt/vtqueryserver/endtoend_test.go | 44 +++++++++++++++-- go/vt/vtqueryserver/vtqueryserver.go | 3 -- 3 files changed, 106 insertions(+), 11 deletions(-) diff --git a/go/vt/mysqlproxy/mysqlproxy.go b/go/vt/mysqlproxy/mysqlproxy.go index ff9d1c7da96..23e5e4b7e5c 100644 --- a/go/vt/mysqlproxy/mysqlproxy.go +++ b/go/vt/mysqlproxy/mysqlproxy.go @@ -70,8 +70,10 @@ func (mp *Proxy) Execute(ctx context.Context, session *ProxySession, sql string, err = mp.doRollback(ctx, session) case sqlparser.StmtSet: result, err = mp.doSet(ctx, session, sql, bindVariables) + case sqlparser.StmtSelect: + result, err = mp.doSelect(ctx, session, sql, bindVariables) default: - result, err = mp.doExecute(ctx, session, sql, bindVariables) + result, err = mp.doExecuteDML(ctx, session, sql, bindVariables) } if err != nil { @@ -133,14 +135,35 @@ func (mp *Proxy) doSet(ctx context.Context, session *ProxySession, sql string, b } for k, v := range vals { - log.Warningf("Ignored inapplicable SET %v = %v", k, v) + switch k { + case "autocommit": + val, ok := v.(int64) + if !ok { + return nil, fmt.Errorf("unexpected value type for autocommit: %T", v) + } + switch val { + case 0: + session.Autocommit = false + case 1: + if session.TransactionID != 0 { + if err := mp.doCommit(ctx, session); err != nil { + return nil, err + } + } + session.Autocommit = true + default: + return nil, fmt.Errorf("unexpected value for autocommit: %d", val) + } + default: + log.Warningf("Ignored inapplicable SET %v = %v", k, v) + } } return &sqltypes.Result{}, nil } -// doExecute runs the given query -func (mp *Proxy) doExecute(ctx context.Context, session *ProxySession, sql string, bindVariables map[string]*querypb.BindVariable) (*sqltypes.Result, error) { +// doSelect runs the given select +func (mp *Proxy) doSelect(ctx context.Context, session *ProxySession, sql string, bindVariables map[string]*querypb.BindVariable) (*sqltypes.Result, error) { if mp.normalize { query, comments := sqlparser.SplitTrailingComments(sql) stmt, err := sqlparser.Parse(query) @@ -154,3 +177,42 @@ func (mp *Proxy) doExecute(ctx context.Context, session *ProxySession, sql strin return mp.qs.Execute(ctx, mp.target, sql, bindVariables, session.TransactionID, session.Options) } + +// doExecuteDML runs the given query handling autocommit semantics +func (mp *Proxy) doExecuteDML(ctx context.Context, session *ProxySession, sql string, bindVariables map[string]*querypb.BindVariable) (*sqltypes.Result, error) { + if mp.normalize { + query, comments := sqlparser.SplitTrailingComments(sql) + stmt, err := sqlparser.Parse(query) + if err != nil { + return nil, err + } + sqlparser.Normalize(stmt, bindVariables, "vtp") + normalized := sqlparser.String(stmt) + sql = normalized + comments + } + + if session.TransactionID != 0 { + return mp.qs.Execute(ctx, mp.target, sql, bindVariables, session.TransactionID, session.Options) + + } else if session.Autocommit { + queries := []*querypb.BoundQuery{{ + Sql: sql, + BindVariables: bindVariables, + }} + + // This is a stopgap until there is a better way to do autocommit + results, err := mp.qs.ExecuteBatch(ctx, mp.target, queries, true /* asTransaction */, 0, session.Options) + if err != nil { + return nil, err + } + return &results[0], nil + + } else { + result, txnID, err := mp.qs.BeginExecute(ctx, mp.target, sql, bindVariables, session.Options) + if err != nil { + return nil, err + } + session.TransactionID = txnID + return result, nil + } +} diff --git a/go/vt/vtqueryserver/endtoend_test.go b/go/vt/vtqueryserver/endtoend_test.go index 263d7ff3d1e..e25b79afc8b 100644 --- a/go/vt/vtqueryserver/endtoend_test.go +++ b/go/vt/vtqueryserver/endtoend_test.go @@ -114,11 +114,11 @@ func testFetch(t *testing.T, conn *mysql.Conn, sql string, expectedRows int) { result, err := conn.ExecuteFetch(sql, 1000, true) if err != nil { - t.Fatalf("error: %v", err) + t.Errorf("error: %v", err) } if len(result.Rows) != expectedRows { - t.Fatalf("expected %d rows but got %d", expectedRows, len(result.Rows)) + t.Errorf("expected %d rows but got %d", expectedRows, len(result.Rows)) } } @@ -127,11 +127,11 @@ func testDML(t *testing.T, conn *mysql.Conn, sql string, expectedRows int) { result, err := conn.ExecuteFetch(sql, 1000, true) if err != nil { - t.Fatalf("error: %v", err) + t.Errorf("error: %v", err) } if int(result.RowsAffected) != expectedRows { - t.Fatalf("expected %d rows affected but got %d", expectedRows, result.RowsAffected) + t.Errorf("expected %d rows affected but got %d", expectedRows, result.RowsAffected) } } @@ -211,3 +211,39 @@ func TestTransactions(t *testing.T) { testFetch(t, conn, "select * from test", 0) testFetch(t, conn2, "select * from test", 0) } + +func TestNoAutocommit(t *testing.T) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &proxyConnParams) + if err != nil { + t.Fatal(err) + } + conn2, err := mysql.Connect(ctx, &proxyConnParams) + if err != nil { + t.Fatal(err) + } + + testDML(t, conn, "set autocommit=0", 0) + + testDML(t, conn, "insert into test (id, val) values(1, 'hello')", 1) + testFetch(t, conn, "select * from test", 1) + testFetch(t, conn2, "select * from test", 0) + testDML(t, conn, "commit", 0) + testFetch(t, conn, "select * from test", 1) + testFetch(t, conn2, "select * from test", 1) + + testDML(t, conn, "delete from test", 1) + testFetch(t, conn, "select * from test", 0) + testFetch(t, conn2, "select * from test", 1) + testDML(t, conn, "rollback", 0) + + testFetch(t, conn, "select * from test", 1) + testFetch(t, conn2, "select * from test", 1) + + testDML(t, conn2, "set autocommit=0", 0) + testDML(t, conn2, "delete from test", 1) + testDML(t, conn2, "commit", 0) + + testFetch(t, conn, "select * from test", 0) + testFetch(t, conn2, "select * from test", 0) +} diff --git a/go/vt/vtqueryserver/vtqueryserver.go b/go/vt/vtqueryserver/vtqueryserver.go index 8ac90a4b931..07750e552b1 100644 --- a/go/vt/vtqueryserver/vtqueryserver.go +++ b/go/vt/vtqueryserver/vtqueryserver.go @@ -49,9 +49,6 @@ func initProxy(dbcfgs *dbconfigs.DBConfigs) (*tabletserver.TabletServer, error) target.Keyspace = *targetKeyspace log.Infof("initalizing vtqueryserver.Proxy for target %s", target.Keyspace) - // force autocommit to be enabled - tabletenv.Config.EnableAutoCommit = true - // creates and registers the query service qs := tabletserver.NewTabletServerWithNilTopoServer(tabletenv.Config) mysqlProxy = mysqlproxy.NewProxy(&target, qs, *normalizeQueries) From 2b3048f58486967949a090816145326cd95bca6e Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Sat, 27 Jan 2018 10:10:08 -0800 Subject: [PATCH 08/11] add accessors to set PassthroughDMLs and AllowUnsafeDMLs --- .../vttablet/tabletserver/query_executor_test.go | 16 ++++++---------- go/vt/vttablet/tabletserver/tabletserver.go | 13 +++++++++++++ 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/go/vt/vttablet/tabletserver/query_executor_test.go b/go/vt/vttablet/tabletserver/query_executor_test.go index 031a54e3575..8b1fb8b83af 100644 --- a/go/vt/vttablet/tabletserver/query_executor_test.go +++ b/go/vt/vttablet/tabletserver/query_executor_test.go @@ -108,8 +108,6 @@ func TestQueryExecutorPlanPassDmlRBR(t *testing.T) { func TestQueryExecutorPassthroughDml(t *testing.T) { db := setUpQueryExecutorTest(t) defer db.Close() - planbuilder.PassthroughDMLs = true - defer func() { planbuilder.PassthroughDMLs = false }() query := "update test_table set pk = foo()" want := &sqltypes.Result{} db.AddQuery(query, want) @@ -118,9 +116,8 @@ func TestQueryExecutorPassthroughDml(t *testing.T) { tsv := newTestTabletServer(ctx, noFlags, db) defer tsv.StopService() - planbuilder.PassthroughDMLs = true - defer func() { planbuilder.PassthroughDMLs = false }() - tsv.qe.passthroughDMLs.Set(true) + tsv.SetPassthroughDMLs(true) + defer tsv.SetPassthroughDMLs(false) tsv.qe.binlogFormat = connpool.BinlogFormatRow txid := newTransaction(tsv, nil) @@ -147,7 +144,7 @@ func TestQueryExecutorPassthroughDml(t *testing.T) { t.Errorf("qre.Execute: %v, want %v", code, vtrpcpb.Code_INVALID_ARGUMENT) } - tsv.qe.allowUnsafeDMLs = true + tsv.SetAllowUnsafeDMLs(true) got, err = qre.Execute() if !reflect.DeepEqual(got, want) { t.Fatalf("got: %v, want: %v", got, want) @@ -201,9 +198,8 @@ func TestQueryExecutorPassthroughDmlAutoCommit(t *testing.T) { tsv := newTestTabletServer(ctx, noFlags, db) defer tsv.StopService() - planbuilder.PassthroughDMLs = true - defer func() { planbuilder.PassthroughDMLs = false }() - tsv.qe.passthroughDMLs.Set(true) + tsv.SetPassthroughDMLs(true) + defer tsv.SetPassthroughDMLs(false) tsv.qe.binlogFormat = connpool.BinlogFormatRow qre := newTestQueryExecutor(ctx, tsv, query, 0) @@ -223,7 +219,7 @@ func TestQueryExecutorPassthroughDmlAutoCommit(t *testing.T) { t.Errorf("qre.Execute: %v, want %v", code, vtrpcpb.Code_INVALID_ARGUMENT) } - tsv.qe.allowUnsafeDMLs = true + tsv.SetAllowUnsafeDMLs(true) got, err = qre.Execute() if err != nil { t.Fatalf("qre.Execute() = %v, want nil", err) diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 3d3c3572ffc..942075fd8df 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -1976,6 +1976,19 @@ func (tsv *TabletServer) MaxDMLRows() int { return int(tsv.qe.maxDMLRows.Get()) } +// SetPassthroughDMLs changes the setting to pass through all DMLs +// It should only be used for testing +func (tsv *TabletServer) SetPassthroughDMLs(val bool) { + planbuilder.PassthroughDMLs = true + tsv.qe.passthroughDMLs.Set(val) +} + +// SetAllowUnsafeDMLs changes the setting to allow unsafe DML statements +// in SBR mode. It should be used only on initialization or for testing. +func (tsv *TabletServer) SetAllowUnsafeDMLs(val bool) { + tsv.qe.allowUnsafeDMLs = val +} + // queryAsString prints a readable version of query+bind variables, // and also truncates data if it's too long func queryAsString(sql string, bindVariables map[string]*querypb.BindVariable) string { From 0ea8013db075f027e994596a351202c8eb389a3e Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Sat, 27 Jan 2018 10:11:13 -0800 Subject: [PATCH 09/11] add vtqueryserver tests for passthrough DMLs --- go/vt/vtqueryserver/endtoend_test.go | 102 ++++++++++++++++++++------- 1 file changed, 76 insertions(+), 26 deletions(-) diff --git a/go/vt/vtqueryserver/endtoend_test.go b/go/vt/vtqueryserver/endtoend_test.go index e25b79afc8b..b855559a7e7 100644 --- a/go/vt/vtqueryserver/endtoend_test.go +++ b/go/vt/vtqueryserver/endtoend_test.go @@ -26,6 +26,7 @@ import ( "github.com/youtube/vitess/go/mysql" "github.com/youtube/vitess/go/vt/dbconfigs" + "github.com/youtube/vitess/go/vt/vttablet/tabletserver" "github.com/youtube/vitess/go/vt/vttablet/tabletserver/tabletenv" "github.com/youtube/vitess/go/vt/vttest" @@ -33,6 +34,7 @@ import ( ) var ( + queryServer *tabletserver.TabletServer mysqlConnParams mysql.ConnParams proxyConnParams mysql.ConnParams ) @@ -89,12 +91,14 @@ func TestMain(m *testing.M) { dbcfgs := dbconfigs.DBConfigs{ App: mysqlConnParams, } - qs, err := initProxy(&dbcfgs) + + var err error + queryServer, err = initProxy(&dbcfgs) if err != nil { fmt.Fprintf(os.Stderr, "could not start proxy: %v\n", err) return 1 } - defer qs.StopService() + defer queryServer.StopService() initMySQLProtocol() defer shutdownMySQLProtocol() @@ -112,7 +116,7 @@ create table valtest(intval int default 0, floatval float default null, charval func testFetch(t *testing.T, conn *mysql.Conn, sql string, expectedRows int) { t.Helper() - result, err := conn.ExecuteFetch(sql, 1000, true) + result, err := conn.ExecuteFetch(sql, 1000, false) if err != nil { t.Errorf("error: %v", err) } @@ -122,16 +126,22 @@ func testFetch(t *testing.T, conn *mysql.Conn, sql string, expectedRows int) { } } -func testDML(t *testing.T, conn *mysql.Conn, sql string, expectedRows int) { +func testDML(t *testing.T, conn *mysql.Conn, sql string, expectedNumQueries int64, expectedRowsAffected uint64) { t.Helper() - result, err := conn.ExecuteFetch(sql, 1000, true) + numQueries := tabletenv.MySQLStats.Count() + result, err := conn.ExecuteFetch(sql, 1000, false) if err != nil { t.Errorf("error: %v", err) } + numQueries = tabletenv.MySQLStats.Count() - numQueries + + if numQueries != expectedNumQueries { + t.Errorf("expected %d mysql queries but got %d", expectedNumQueries, numQueries) + } - if int(result.RowsAffected) != expectedRows { - t.Errorf("expected %d rows affected but got %d", expectedRows, result.RowsAffected) + if result.RowsAffected != expectedRowsAffected { + t.Errorf("expected %d rows affected but got %d", expectedRowsAffected, result.RowsAffected) } } @@ -165,12 +175,52 @@ func TestAutocommitDMLs(t *testing.T) { t.Fatal(err) } - testDML(t, conn, "insert into test (id, val) values(1, 'hello')", 1) + testDML(t, conn, "insert into test (id, val) values(1, 'hello')", 3, 1) testFetch(t, conn, "select * from test", 1) testFetch(t, conn2, "select * from test", 1) - testDML(t, conn, "delete from test", 1) + testDML(t, conn, "delete from test", 4, 1) + + testFetch(t, conn, "select * from test", 0) + testFetch(t, conn2, "select * from test", 0) +} + +func TestPassthroughDMLs(t *testing.T) { + ctx := context.Background() + + queryServer.SetPassthroughDMLs(true) + conn, err := mysql.Connect(ctx, &proxyConnParams) + if err != nil { + t.Fatal(err) + } + + conn2, err := mysql.Connect(ctx, &proxyConnParams) + if err != nil { + t.Fatal(err) + } + + testDML(t, conn, "insert into test (id, val) values(1, 'hello')", 3, 1) + testDML(t, conn, "insert into test (id, val) values(2, 'hello')", 3, 1) + testDML(t, conn, "insert into test (id, val) values(3, 'hello')", 3, 1) + + // Subquery DMLs are errors in passthrough mode with SBR, unless + // SetAllowUnsafeDMLs is set + _, err = conn.ExecuteFetch("update test set val='goodbye'", 1000, true) + if err == nil || !strings.Contains(err.Error(), "cannot identify primary key of statement") { + t.Fatalf("expected error but got: %v", err) + } + + queryServer.SetAllowUnsafeDMLs(true) + + // This is 3 queries in passthrough mode and not 4 queries as it would + // be in non-passthrough mode + testDML(t, conn, "update test set val='goodbye'", 3, 3) + + testFetch(t, conn, "select * from test where val='goodbye'", 3) + testFetch(t, conn2, "select * from test where val='goodbye'", 3) + + testDML(t, conn, "delete from test", 4, 3) testFetch(t, conn, "select * from test", 0) testFetch(t, conn2, "select * from test", 0) @@ -187,26 +237,26 @@ func TestTransactions(t *testing.T) { t.Fatal(err) } - testDML(t, conn, "begin", 0) - testDML(t, conn, "insert into test (id, val) values(1, 'hello')", 1) + testDML(t, conn, "begin", 1, 0) + testDML(t, conn, "insert into test (id, val) values(1, 'hello')", 1, 1) testFetch(t, conn, "select * from test", 1) testFetch(t, conn2, "select * from test", 0) - testDML(t, conn, "commit", 0) + testDML(t, conn, "commit", 1, 0) testFetch(t, conn, "select * from test", 1) testFetch(t, conn2, "select * from test", 1) - testDML(t, conn, "begin", 0) - testDML(t, conn, "delete from test", 1) + testDML(t, conn, "begin", 1, 0) + testDML(t, conn, "delete from test", 2, 1) testFetch(t, conn, "select * from test", 0) testFetch(t, conn2, "select * from test", 1) - testDML(t, conn, "rollback", 0) + testDML(t, conn, "rollback", 1, 0) testFetch(t, conn, "select * from test", 1) testFetch(t, conn2, "select * from test", 1) - testDML(t, conn2, "begin", 0) - testDML(t, conn2, "delete from test", 1) - testDML(t, conn2, "commit", 0) + testDML(t, conn2, "begin", 1, 0) + testDML(t, conn2, "delete from test", 2, 1) + testDML(t, conn2, "commit", 1, 0) testFetch(t, conn, "select * from test", 0) testFetch(t, conn2, "select * from test", 0) @@ -223,26 +273,26 @@ func TestNoAutocommit(t *testing.T) { t.Fatal(err) } - testDML(t, conn, "set autocommit=0", 0) + testFetch(t, conn, "set autocommit=0", 0) - testDML(t, conn, "insert into test (id, val) values(1, 'hello')", 1) + testDML(t, conn, "insert into test (id, val) values(1, 'hello')", 2, 1) testFetch(t, conn, "select * from test", 1) testFetch(t, conn2, "select * from test", 0) - testDML(t, conn, "commit", 0) + testDML(t, conn, "commit", 1, 0) testFetch(t, conn, "select * from test", 1) testFetch(t, conn2, "select * from test", 1) - testDML(t, conn, "delete from test", 1) + testDML(t, conn, "delete from test", 3, 1) testFetch(t, conn, "select * from test", 0) testFetch(t, conn2, "select * from test", 1) - testDML(t, conn, "rollback", 0) + testDML(t, conn, "rollback", 1, 0) testFetch(t, conn, "select * from test", 1) testFetch(t, conn2, "select * from test", 1) - testDML(t, conn2, "set autocommit=0", 0) - testDML(t, conn2, "delete from test", 1) - testDML(t, conn2, "commit", 0) + testFetch(t, conn2, "set autocommit=0", 0) + testDML(t, conn2, "delete from test", 3, 1) + testDML(t, conn2, "commit", 1, 0) testFetch(t, conn, "select * from test", 0) testFetch(t, conn2, "select * from test", 0) From bc7ffa9894a7d6b4a49f8a07ae941eb490d4c065 Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Sat, 27 Jan 2018 10:11:48 -0800 Subject: [PATCH 10/11] add vtqueryserver flag for allow_unsafe_dmls --- go/vt/vtqueryserver/vtqueryserver.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go/vt/vtqueryserver/vtqueryserver.go b/go/vt/vtqueryserver/vtqueryserver.go index 07750e552b1..a048d0cfeda 100644 --- a/go/vt/vtqueryserver/vtqueryserver.go +++ b/go/vt/vtqueryserver/vtqueryserver.go @@ -43,6 +43,7 @@ var ( targetKeyspace = flag.String("target", "", "Target database name") normalizeQueries = flag.Bool("normalize_queries", true, "Rewrite queries with bind vars. Turn this off if the app itself sends normalized queries with bind vars.") + allowUnsafeDMLs = flag.Bool("allow_unsafe_dmls", false, "Allow passthrough DML statements when running with statement-based replication") ) func initProxy(dbcfgs *dbconfigs.DBConfigs) (*tabletserver.TabletServer, error) { @@ -51,6 +52,7 @@ func initProxy(dbcfgs *dbconfigs.DBConfigs) (*tabletserver.TabletServer, error) // creates and registers the query service qs := tabletserver.NewTabletServerWithNilTopoServer(tabletenv.Config) + qs.SetAllowUnsafeDMLs(*allowUnsafeDMLs) mysqlProxy = mysqlproxy.NewProxy(&target, qs, *normalizeQueries) err := qs.StartService(target, *dbcfgs) From 4a44d25ff4e324887886da7d5708a004782d983b Mon Sep 17 00:00:00 2001 From: Michael Demmer Date: Mon, 29 Jan 2018 14:59:29 -0800 Subject: [PATCH 11/11] add proxy support for other statements This allows EXPLAIN and other such statements to go through without query normalization. --- go/vt/mysqlproxy/mysqlproxy.go | 20 ++++++++++++++------ go/vt/vtqueryserver/endtoend_test.go | 17 ++++++++++++++++- 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/go/vt/mysqlproxy/mysqlproxy.go b/go/vt/mysqlproxy/mysqlproxy.go index 23e5e4b7e5c..e89471d7039 100644 --- a/go/vt/mysqlproxy/mysqlproxy.go +++ b/go/vt/mysqlproxy/mysqlproxy.go @@ -70,10 +70,12 @@ func (mp *Proxy) Execute(ctx context.Context, session *ProxySession, sql string, err = mp.doRollback(ctx, session) case sqlparser.StmtSet: result, err = mp.doSet(ctx, session, sql, bindVariables) + case sqlparser.StmtInsert, sqlparser.StmtUpdate, sqlparser.StmtDelete, sqlparser.StmtReplace: + result, err = mp.executeDML(ctx, session, sql, bindVariables) case sqlparser.StmtSelect: - result, err = mp.doSelect(ctx, session, sql, bindVariables) + result, err = mp.executeSelect(ctx, session, sql, bindVariables) default: - result, err = mp.doExecuteDML(ctx, session, sql, bindVariables) + result, err = mp.executeOther(ctx, session, sql, bindVariables) } if err != nil { @@ -162,12 +164,13 @@ func (mp *Proxy) doSet(ctx context.Context, session *ProxySession, sql string, b return &sqltypes.Result{}, nil } -// doSelect runs the given select -func (mp *Proxy) doSelect(ctx context.Context, session *ProxySession, sql string, bindVariables map[string]*querypb.BindVariable) (*sqltypes.Result, error) { +// executeSelect runs the given select statement +func (mp *Proxy) executeSelect(ctx context.Context, session *ProxySession, sql string, bindVariables map[string]*querypb.BindVariable) (*sqltypes.Result, error) { if mp.normalize { query, comments := sqlparser.SplitTrailingComments(sql) stmt, err := sqlparser.Parse(query) if err != nil { + fmt.Printf("YYY parse error %s\n", query) return nil, err } sqlparser.Normalize(stmt, bindVariables, "vtp") @@ -178,8 +181,8 @@ func (mp *Proxy) doSelect(ctx context.Context, session *ProxySession, sql string return mp.qs.Execute(ctx, mp.target, sql, bindVariables, session.TransactionID, session.Options) } -// doExecuteDML runs the given query handling autocommit semantics -func (mp *Proxy) doExecuteDML(ctx context.Context, session *ProxySession, sql string, bindVariables map[string]*querypb.BindVariable) (*sqltypes.Result, error) { +// executeDML runs the given query handling autocommit semantics +func (mp *Proxy) executeDML(ctx context.Context, session *ProxySession, sql string, bindVariables map[string]*querypb.BindVariable) (*sqltypes.Result, error) { if mp.normalize { query, comments := sqlparser.SplitTrailingComments(sql) stmt, err := sqlparser.Parse(query) @@ -216,3 +219,8 @@ func (mp *Proxy) doExecuteDML(ctx context.Context, session *ProxySession, sql st return result, nil } } + +// executeOther runs the given other statement bypassing the normalizer +func (mp *Proxy) executeOther(ctx context.Context, session *ProxySession, sql string, bindVariables map[string]*querypb.BindVariable) (*sqltypes.Result, error) { + return mp.qs.Execute(ctx, mp.target, sql, bindVariables, session.TransactionID, session.Options) +} diff --git a/go/vt/vtqueryserver/endtoend_test.go b/go/vt/vtqueryserver/endtoend_test.go index b855559a7e7..b04a0bb8a58 100644 --- a/go/vt/vtqueryserver/endtoend_test.go +++ b/go/vt/vtqueryserver/endtoend_test.go @@ -25,6 +25,7 @@ import ( "testing" "github.com/youtube/vitess/go/mysql" + "github.com/youtube/vitess/go/sqltypes" "github.com/youtube/vitess/go/vt/dbconfigs" "github.com/youtube/vitess/go/vt/vttablet/tabletserver" "github.com/youtube/vitess/go/vt/vttablet/tabletserver/tabletenv" @@ -113,7 +114,7 @@ create table test(id int, val varchar(256), primary key(id)); create table valtest(intval int default 0, floatval float default null, charval varchar(256) default null, binval varbinary(256) default null, primary key(intval)); ` -func testFetch(t *testing.T, conn *mysql.Conn, sql string, expectedRows int) { +func testFetch(t *testing.T, conn *mysql.Conn, sql string, expectedRows int) *sqltypes.Result { t.Helper() result, err := conn.ExecuteFetch(sql, 1000, false) @@ -124,6 +125,8 @@ func testFetch(t *testing.T, conn *mysql.Conn, sql string, expectedRows int) { if len(result.Rows) != expectedRows { t.Errorf("expected %d rows but got %d", expectedRows, len(result.Rows)) } + + return result } func testDML(t *testing.T, conn *mysql.Conn, sql string, expectedNumQueries int64, expectedRowsAffected uint64) { @@ -297,3 +300,15 @@ func TestNoAutocommit(t *testing.T) { testFetch(t, conn, "select * from test", 0) testFetch(t, conn2, "select * from test", 0) } + +func TestOther(t *testing.T) { + ctx := context.Background() + conn, err := mysql.Connect(ctx, &proxyConnParams) + if err != nil { + t.Fatal(err) + } + + testFetch(t, conn, "explain select * from test", 1) + testFetch(t, conn, "select table_name, table_rows from information_schema.tables where table_name='test'", 1) + +}