This repository has been archived by the owner on Dec 16, 2022. It is now read-only.
forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'vtqueryserver' into slack-sync-upstream-2018-01-29
- Loading branch information
Showing
12 changed files
with
1,296 additions
and
10 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
/* | ||
Copyright 2017 Google Inc. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package main | ||
|
||
import ( | ||
"net/http" | ||
) | ||
|
||
// This is a separate file so it can be selectively included/excluded from | ||
// builds to opt in/out of the redirect. | ||
|
||
func init() { | ||
// Anything unrecognized gets redirected to the status page. | ||
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { | ||
http.Redirect(w, r, "/debug/status", http.StatusFound) | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
/* | ||
Copyright 2017 Google Inc. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreedto in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package main | ||
|
||
// This plugin imports staticauthserver to register the flat-file implementation of AuthServer. | ||
|
||
import ( | ||
"github.com/youtube/vitess/go/mysql" | ||
"github.com/youtube/vitess/go/vt/vtqueryserver" | ||
) | ||
|
||
func init() { | ||
vtqueryserver.RegisterPluginInitializer(func() { mysql.InitAuthServerStatic() }) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
/* | ||
Copyright 2017 Google Inc. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package main | ||
|
||
import ( | ||
"github.com/youtube/vitess/go/vt/servenv" | ||
"github.com/youtube/vitess/go/vt/vttablet/grpcqueryservice" | ||
"github.com/youtube/vitess/go/vt/vttablet/tabletserver" | ||
) | ||
|
||
// Imports and register the gRPC queryservice server | ||
|
||
func init() { | ||
tabletserver.RegisterFunctions = append(tabletserver.RegisterFunctions, func(qsc tabletserver.Controller) { | ||
if servenv.GRPCCheckServiceMap("queryservice") { | ||
grpcqueryservice.Register(servenv.GRPCServer, qsc.QueryService()) | ||
} | ||
}) | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
/* | ||
Copyright 2017 Google Inc. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package main | ||
|
||
import ( | ||
"flag" | ||
"os" | ||
|
||
log "github.com/golang/glog" | ||
"github.com/youtube/vitess/go/vt/dbconfigs" | ||
"github.com/youtube/vitess/go/vt/servenv" | ||
"github.com/youtube/vitess/go/vt/vtqueryserver" | ||
"github.com/youtube/vitess/go/vt/vttablet/tabletserver/tabletenv" | ||
) | ||
|
||
var ( | ||
mysqlSocketFile = flag.String("mysql-socket-file", "", "path to unix socket file to connect to mysql") | ||
) | ||
|
||
func init() { | ||
servenv.RegisterDefaultFlags() | ||
} | ||
|
||
func main() { | ||
dbconfigFlags := dbconfigs.AppConfig | dbconfigs.AppDebugConfig | ||
dbconfigs.RegisterFlags(dbconfigFlags) | ||
flag.Parse() | ||
|
||
if *servenv.Version { | ||
servenv.AppVersion.Print() | ||
os.Exit(0) | ||
} | ||
|
||
if len(flag.Args()) > 0 { | ||
flag.Usage() | ||
log.Exit("vtqueryserver doesn't take any positional arguments") | ||
} | ||
if err := tabletenv.VerifyConfig(); err != nil { | ||
log.Exitf("invalid config: %v", err) | ||
} | ||
|
||
tabletenv.Init() | ||
|
||
servenv.Init() | ||
|
||
dbcfgs, err := dbconfigs.Init(*mysqlSocketFile, dbconfigFlags) | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
|
||
err = vtqueryserver.Init(dbcfgs) | ||
if err != nil { | ||
log.Exitf("error initializing proxy: %v", err) | ||
} | ||
|
||
servenv.RunDefault() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,226 @@ | ||
/* | ||
Copyright 2017 Google Inc. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
// Package mysqlproxy is a basic module that proxies a mysql server | ||
// session to appropriate calls in a queryservice back end, with optional | ||
// query normalization. | ||
package mysqlproxy | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
log "github.com/golang/glog" | ||
|
||
"github.com/youtube/vitess/go/sqltypes" | ||
"github.com/youtube/vitess/go/vt/sqlparser" | ||
"github.com/youtube/vitess/go/vt/vttablet/queryservice" | ||
|
||
querypb "github.com/youtube/vitess/go/vt/proto/query" | ||
) | ||
|
||
// ProxySession holds session state for the proxy | ||
type ProxySession struct { | ||
TransactionID int64 | ||
TargetString string | ||
Options *querypb.ExecuteOptions | ||
Autocommit bool | ||
} | ||
|
||
// Proxy wraps the standalone query service | ||
type Proxy struct { | ||
target *querypb.Target | ||
qs queryservice.QueryService | ||
normalize bool | ||
} | ||
|
||
// NewProxy creates a new proxy | ||
func NewProxy(target *querypb.Target, qs queryservice.QueryService, normalize bool) *Proxy { | ||
return &Proxy{ | ||
target: target, | ||
qs: qs, | ||
normalize: normalize, | ||
} | ||
} | ||
|
||
// Execute runs the given sql query in the specified session | ||
func (mp *Proxy) Execute(ctx context.Context, session *ProxySession, sql string, bindVariables map[string]*querypb.BindVariable) (*ProxySession, *sqltypes.Result, error) { | ||
var err error | ||
result := &sqltypes.Result{} | ||
|
||
switch sqlparser.Preview(sql) { | ||
case sqlparser.StmtBegin: | ||
err = mp.doBegin(ctx, session) | ||
case sqlparser.StmtCommit: | ||
err = mp.doCommit(ctx, session) | ||
case sqlparser.StmtRollback: | ||
err = mp.doRollback(ctx, session) | ||
case sqlparser.StmtSet: | ||
result, err = mp.doSet(ctx, session, sql, bindVariables) | ||
case sqlparser.StmtInsert, sqlparser.StmtUpdate, sqlparser.StmtDelete, sqlparser.StmtReplace: | ||
result, err = mp.executeDML(ctx, session, sql, bindVariables) | ||
case sqlparser.StmtSelect: | ||
result, err = mp.executeSelect(ctx, session, sql, bindVariables) | ||
default: | ||
result, err = mp.executeOther(ctx, session, sql, bindVariables) | ||
} | ||
|
||
if err != nil { | ||
return nil, nil, err | ||
} | ||
|
||
return session, result, nil | ||
} | ||
|
||
// Rollback rolls back the session | ||
func (mp *Proxy) Rollback(ctx context.Context, session *ProxySession) error { | ||
return mp.doRollback(ctx, session) | ||
} | ||
|
||
func (mp *Proxy) doBegin(ctx context.Context, session *ProxySession) error { | ||
txID, err := mp.qs.Begin(ctx, mp.target, session.Options) | ||
if err != nil { | ||
return err | ||
} | ||
session.TransactionID = txID | ||
return nil | ||
} | ||
|
||
func (mp *Proxy) doCommit(ctx context.Context, session *ProxySession) error { | ||
if session.TransactionID == 0 { | ||
return fmt.Errorf("commit: no open transaction") | ||
|
||
} | ||
err := mp.qs.Commit(ctx, mp.target, session.TransactionID) | ||
session.TransactionID = 0 | ||
return err | ||
} | ||
|
||
// Rollback rolls back the session | ||
func (mp *Proxy) doRollback(ctx context.Context, session *ProxySession) error { | ||
if session.TransactionID != 0 { | ||
err := mp.qs.Rollback(ctx, mp.target, session.TransactionID) | ||
session.TransactionID = 0 | ||
return err | ||
} | ||
return nil | ||
} | ||
|
||
// Set is currently ignored | ||
func (mp *Proxy) doSet(ctx context.Context, session *ProxySession, sql string, bindVariables map[string]*querypb.BindVariable) (*sqltypes.Result, error) { | ||
vals, charset, _, err := sqlparser.ExtractSetValues(sql) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if len(vals) > 0 && charset != "" { | ||
return nil, err | ||
} | ||
|
||
switch charset { | ||
case "", "utf8", "utf8mb4", "latin1", "default": | ||
break | ||
default: | ||
return nil, fmt.Errorf("unexpected value for charset: %v", charset) | ||
} | ||
|
||
for k, v := range vals { | ||
switch k { | ||
case "autocommit": | ||
val, ok := v.(int64) | ||
if !ok { | ||
return nil, fmt.Errorf("unexpected value type for autocommit: %T", v) | ||
} | ||
switch val { | ||
case 0: | ||
session.Autocommit = false | ||
case 1: | ||
if session.TransactionID != 0 { | ||
if err := mp.doCommit(ctx, session); err != nil { | ||
return nil, err | ||
} | ||
} | ||
session.Autocommit = true | ||
default: | ||
return nil, fmt.Errorf("unexpected value for autocommit: %d", val) | ||
} | ||
default: | ||
log.Warningf("Ignored inapplicable SET %v = %v", k, v) | ||
} | ||
} | ||
|
||
return &sqltypes.Result{}, nil | ||
} | ||
|
||
// executeSelect runs the given select statement | ||
func (mp *Proxy) executeSelect(ctx context.Context, session *ProxySession, sql string, bindVariables map[string]*querypb.BindVariable) (*sqltypes.Result, error) { | ||
if mp.normalize { | ||
query, comments := sqlparser.SplitTrailingComments(sql) | ||
stmt, err := sqlparser.Parse(query) | ||
if err != nil { | ||
fmt.Printf("YYY parse error %s\n", query) | ||
return nil, err | ||
} | ||
sqlparser.Normalize(stmt, bindVariables, "vtp") | ||
normalized := sqlparser.String(stmt) | ||
sql = normalized + comments | ||
} | ||
|
||
return mp.qs.Execute(ctx, mp.target, sql, bindVariables, session.TransactionID, session.Options) | ||
} | ||
|
||
// executeDML runs the given query handling autocommit semantics | ||
func (mp *Proxy) executeDML(ctx context.Context, session *ProxySession, sql string, bindVariables map[string]*querypb.BindVariable) (*sqltypes.Result, error) { | ||
if mp.normalize { | ||
query, comments := sqlparser.SplitTrailingComments(sql) | ||
stmt, err := sqlparser.Parse(query) | ||
if err != nil { | ||
return nil, err | ||
} | ||
sqlparser.Normalize(stmt, bindVariables, "vtp") | ||
normalized := sqlparser.String(stmt) | ||
sql = normalized + comments | ||
} | ||
|
||
if session.TransactionID != 0 { | ||
return mp.qs.Execute(ctx, mp.target, sql, bindVariables, session.TransactionID, session.Options) | ||
|
||
} else if session.Autocommit { | ||
queries := []*querypb.BoundQuery{{ | ||
Sql: sql, | ||
BindVariables: bindVariables, | ||
}} | ||
|
||
// This is a stopgap until there is a better way to do autocommit | ||
results, err := mp.qs.ExecuteBatch(ctx, mp.target, queries, true /* asTransaction */, 0, session.Options) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return &results[0], nil | ||
|
||
} else { | ||
result, txnID, err := mp.qs.BeginExecute(ctx, mp.target, sql, bindVariables, session.Options) | ||
if err != nil { | ||
return nil, err | ||
} | ||
session.TransactionID = txnID | ||
return result, nil | ||
} | ||
} | ||
|
||
// executeOther runs the given other statement bypassing the normalizer | ||
func (mp *Proxy) executeOther(ctx context.Context, session *ProxySession, sql string, bindVariables map[string]*querypb.BindVariable) (*sqltypes.Result, error) { | ||
return mp.qs.Execute(ctx, mp.target, sql, bindVariables, session.TransactionID, session.Options) | ||
} |
Oops, something went wrong.