diff --git a/bindinfo/bind_test.go b/bindinfo/bind_test.go new file mode 100644 index 0000000000000..abebc1277395f --- /dev/null +++ b/bindinfo/bind_test.go @@ -0,0 +1,136 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package bindinfo_test + +import ( + "flag" + "fmt" + "os" + "testing" + + . "github.com/pingcap/check" + "github.com/pingcap/parser" + "github.com/pingcap/tidb/bindinfo" + "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta/autoid" + "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/store/mockstore" + "github.com/pingcap/tidb/store/mockstore/mocktikv" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/mock" + "github.com/pingcap/tidb/util/testkit" + "github.com/pingcap/tidb/util/testleak" +) + +func TestT(t *testing.T) { + CustomVerboseFlag = true + logLevel := os.Getenv("log_level") + logutil.InitLogger(logutil.NewLogConfig(logLevel, logutil.DefaultLogFormat, "", logutil.EmptyFileLogConfig, false)) + autoid.SetStep(5000) + TestingT(t) +} + +var _ = Suite(&testSuite{}) + +type testSuite struct { + cluster *mocktikv.Cluster + mvccStore mocktikv.MVCCStore + store kv.Storage + domain *domain.Domain + *parser.Parser + ctx *mock.Context +} + +var mockTikv = flag.Bool("mockTikv", true, "use mock tikv store in bind test") + +func (s *testSuite) SetUpSuite(c *C) { + testleak.BeforeTest() + s.Parser = parser.New() + flag.Lookup("mockTikv") + useMockTikv := *mockTikv + if useMockTikv { + s.cluster = mocktikv.NewCluster() + mocktikv.BootstrapWithSingleStore(s.cluster) + s.mvccStore = mocktikv.MustNewMVCCStore() + store, err := mockstore.NewMockTikvStore( + mockstore.WithCluster(s.cluster), + mockstore.WithMVCCStore(s.mvccStore), + ) + c.Assert(err, IsNil) + s.store = store + session.SetSchemaLease(0) + session.SetStatsLease(0) + } + d, err := session.BootstrapSession(s.store) + c.Assert(err, IsNil) + d.SetStatsUpdating(true) + s.domain = d +} + +func (s *testSuite) TearDownSuite(c *C) { + s.domain.Close() + s.store.Close() + testleak.AfterTest(c)() +} + +func (s *testSuite) TearDownTest(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + r := tk.MustQuery("show tables") + for _, tb := range r.Rows() { + tableName := tb[0] + tk.MustExec(fmt.Sprintf("drop table %v", tableName)) + } +} + +func (s *testSuite) cleanBindingEnv(tk *testkit.TestKit) { + tk.MustExec("drop table if exists mysql.bind_info") + tk.MustExec(session.CreateBindInfoTable) +} + +func (s *testSuite) TestBindParse(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("create table t(i int)") + tk.MustExec("create index index_t on t(i)") + + originSQL := "select * from t" + bindSQL := "select * from t use index(index_t)" + defaultDb := "test" + status := "using" + charset := "utf8mb4" + collation := "utf8mb4_bin" + sql := fmt.Sprintf(`INSERT INTO mysql.bind_info(original_sql,bind_sql,default_db,status,create_time,update_time,charset,collation) VALUES ('%s', '%s', '%s', '%s', NOW(), NOW(),'%s', '%s')`, + originSQL, bindSQL, defaultDb, status, charset, collation) + tk.MustExec(sql) + bindHandle := bindinfo.NewHandle() + bindCacheUpdater := bindinfo.NewBindCacheUpdater(tk.Se, bindHandle, s.Parser) + err := bindCacheUpdater.Update(true) + c.Check(err, IsNil) + c.Check(len(bindHandle.Get()), Equals, 1) + + hash := parser.DigestHash("select * from t") + bindData := bindHandle.Get()[hash] + c.Check(bindData, NotNil) + c.Check(len(bindData), Equals, 1) + c.Check(bindData[0].OriginalSQL, Equals, "select * from t") + c.Check(bindData[0].BindSQL, Equals, "select * from t use index(index_t)") + c.Check(bindData[0].Db, Equals, "test") + c.Check(bindData[0].Status, Equals, "using") + c.Check(bindData[0].Charset, Equals, "utf8mb4") + c.Check(bindData[0].Collation, Equals, "utf8mb4_bin") + c.Check(bindData[0].CreateTime, NotNil) + c.Check(bindData[0].UpdateTime, NotNil) +} diff --git a/bindinfo/cache.go b/bindinfo/cache.go new file mode 100644 index 0000000000000..bbb083100f98a --- /dev/null +++ b/bindinfo/cache.go @@ -0,0 +1,193 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package bindinfo + +import ( + "context" + "fmt" + "sync/atomic" + + "github.com/pingcap/errors" + "github.com/pingcap/parser" + "github.com/pingcap/parser/ast" + "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/sqlexec" +) + +const ( + using = "using" + deleted = "deleted" +) + +// bindMeta stores the basic bind info and bindSql astNode. +type bindMeta struct { + *bindRecord + ast ast.StmtNode //ast will be used to do query sql bind check +} + +// cache is a k-v map, key is original sql, value is a slice of bindMeta. +type cache map[string][]*bindMeta + +// Handle holds an atomic cache. +type Handle struct { + atomic.Value +} + +// BindCacheUpdater is used to update the global cache. +// BindCacheUpdater will update the bind cache per 3 seconds in domain +// gorountine loop. When the tidb server first startup, the updater will load +// all bind info into memory; then load diff bind info per 3 second. +type BindCacheUpdater struct { + ctx sessionctx.Context + + parser *parser.Parser + lastUpdateTime types.Time + globalHandle *Handle +} + +type bindRecord struct { + OriginalSQL string + BindSQL string + Db string + // Status represents the status of the binding. It can only be one of the following values: + // 1. deleted: bindRecord is deleted, can not be used anymore. + // 2. using: bindRecord is in the normal active mode. + Status string + CreateTime types.Time + UpdateTime types.Time + Charset string + Collation string +} + +// NewBindCacheUpdater creates a new BindCacheUpdater. +func NewBindCacheUpdater(ctx sessionctx.Context, handle *Handle, parser *parser.Parser) *BindCacheUpdater { + return &BindCacheUpdater{ + ctx: ctx, + parser: parser, + globalHandle: handle, + } +} + +// NewHandle creates a Handle with a cache. +func NewHandle() *Handle { + handle := &Handle{} + return handle +} + +// Get gets cache from a Handle. +func (h *Handle) Get() cache { + bc := h.Load() + if bc != nil { + return bc.(map[string][]*bindMeta) + } + return make(map[string][]*bindMeta) +} + +// LoadDiff is used to load new bind info to cache bc. +func (bindCacheUpdater *BindCacheUpdater) loadDiff(sql string, bc cache) error { + recordSets, err := bindCacheUpdater.ctx.(sqlexec.SQLExecutor).Execute(context.Background(), sql) + if err != nil { + return errors.Trace(err) + } + + rs := recordSets[0] + defer terror.Call(rs.Close) + chkBatch := rs.NewRecordBatch() + for { + err = rs.Next(context.TODO(), chkBatch) + if err != nil || chkBatch.NumRows() == 0 { + return errors.Trace(err) + } + + it := chunk.NewIterator4Chunk(chkBatch.Chunk) + for row := it.Begin(); row != it.End(); row = it.Next() { + record := newBindMeta(row) + err = bc.appendNode(record, bindCacheUpdater.parser) + if err != nil { + return err + } + if record.UpdateTime.Compare(bindCacheUpdater.lastUpdateTime) == 1 { + bindCacheUpdater.lastUpdateTime = record.UpdateTime + } + } + } +} + +// Update updates the BindCacheUpdater's cache. +// The `fullLoad` is true only when tidb first startup, otherwise it is false. +func (bindCacheUpdater *BindCacheUpdater) Update(fullLoad bool) (err error) { + var sql string + bc := bindCacheUpdater.globalHandle.Get() + newBc := make(map[string][]*bindMeta, len(bc)) + for hash, bindDataArr := range bc { + newBc[hash] = append(newBc[hash], bindDataArr...) + } + + if fullLoad { + sql = "select original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation from mysql.bind_info" + } else { + sql = fmt.Sprintf("select original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation from mysql.bind_info where update_time > \"%s\"", bindCacheUpdater.lastUpdateTime.String()) + } + err = bindCacheUpdater.loadDiff(sql, newBc) + if err != nil { + return errors.Trace(err) + } + + bindCacheUpdater.globalHandle.Store(newBc) + return nil +} + +func newBindMeta(row chunk.Row) *bindRecord { + return &bindRecord{ + OriginalSQL: row.GetString(0), + BindSQL: row.GetString(1), + Db: row.GetString(2), + Status: row.GetString(3), + CreateTime: row.GetTime(4), + UpdateTime: row.GetTime(5), + Charset: row.GetString(6), + Collation: row.GetString(7), + } +} + +func (b cache) appendNode(newBindRecord *bindRecord, sparser *parser.Parser) error { + hash := parser.DigestHash(newBindRecord.OriginalSQL) + if bindArr, ok := b[hash]; ok { + for idx, v := range bindArr { + if v.OriginalSQL == newBindRecord.OriginalSQL && v.Db == newBindRecord.Db { + b[hash] = append(b[hash][:idx], b[hash][idx+1:]...) + if len(b[hash]) == 0 { + delete(b, hash) + } + break + } + } + } + if newBindRecord.Status == deleted { + return nil + } + stmtNodes, _, err := sparser.Parse(newBindRecord.BindSQL, newBindRecord.Charset, newBindRecord.Collation) + if err != nil { + return errors.Trace(err) + } + newNode := &bindMeta{ + bindRecord: newBindRecord, + ast: stmtNodes[0], + } + b[hash] = append(b[hash], newNode) + return nil +} diff --git a/domain/domain.go b/domain/domain.go index 5d70dd57cfb51..7db4864f59cfa 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -23,13 +23,15 @@ import ( "unsafe" "github.com/coreos/etcd/clientv3" - grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/ngaut/pools" "github.com/ngaut/sync2" "github.com/pingcap/errors" + "github.com/pingcap/parser" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/infoschema" @@ -43,8 +45,10 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/sqlexec" log "github.com/sirupsen/logrus" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" ) @@ -55,6 +59,7 @@ type Domain struct { store kv.Storage infoHandle *infoschema.Handle privHandle *privileges.Handle + bindHandle *bindinfo.Handle statsHandle unsafe.Pointer statsLease time.Duration statsUpdating sync2.AtomicInt32 @@ -794,6 +799,41 @@ func (do *Domain) PrivilegeHandle() *privileges.Handle { return do.privHandle } +// BindHandle returns domain's bindHandle. +func (do *Domain) BindHandle() *bindinfo.Handle { + return do.bindHandle +} + +// LoadBindInfoLoop create a goroutine loads BindInfo in a loop, it should +// be called only once in BootstrapSession. +func (do *Domain) LoadBindInfoLoop(ctx sessionctx.Context, parser *parser.Parser) error { + ctx.GetSessionVars().InRestrictedSQL = true + do.bindHandle = bindinfo.NewHandle() + + bindCacheUpdater := bindinfo.NewBindCacheUpdater(ctx, do.BindHandle(), parser) + err := bindCacheUpdater.Update(true) + if err != nil { + return errors.Trace(err) + } + + duration := 3 * time.Second + go func() { + defer recoverInDomain("loadBindInfoLoop", false) + for { + select { + case <-do.exit: + return + case <-time.After(duration): + } + err = bindCacheUpdater.Update(false) + if err != nil { + logutil.Logger(context.Background()).Error("update bindinfo failed", zap.Error(err)) + } + } + }() + return nil +} + // StatsHandle returns the statistic handle. func (do *Domain) StatsHandle() *statistics.Handle { return (*statistics.Handle)(atomic.LoadPointer(&do.statsHandle)) diff --git a/session/bootstrap.go b/session/bootstrap.go index 925f24d8cdb08..eb2d0ea5b36cd 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -214,6 +214,19 @@ const ( index hist(table_id, is_index, hist_id) );` + // CreateBindInfoTable stores the sql bind info which is used to update globalBindCache. + CreateBindInfoTable = `CREATE TABLE IF NOT EXISTS mysql.bind_info ( + original_sql text NOT NULL , + bind_sql text NOT NULL , + default_db text NOT NULL, + status text NOT NULL, + create_time timestamp NOT NULL, + update_time timestamp NOT NULL, + charset text NOT NULL, + collation text NOT NULL, + INDEX time_index(update_time) COMMENT "accelerate the speed when querying with last update time" + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;` + // CreateRoleEdgesTable stores the role and user relationship information. CreateRoleEdgesTable = `CREATE TABLE IF NOT EXISTS mysql.role_edges ( FROM_HOST char(60) COLLATE utf8_bin NOT NULL DEFAULT '', @@ -288,6 +301,7 @@ const ( version25 = 25 version26 = 26 version27 = 27 + version28 = 28 ) func checkBootstrapped(s Session) (bool, error) { @@ -454,6 +468,10 @@ func upgrade(s Session) { upgradeToVer27(s) } + if ver < version28 { + upgradeToVer28(s) + } + updateBootstrapVer(s) _, err = s.Execute(context.Background(), "COMMIT") @@ -729,6 +747,10 @@ func upgradeToVer27(s Session) { doReentrantDDL(s, "ALTER TABLE mysql.stats_histograms ADD COLUMN `correlation` double NOT NULL DEFAULT 0", infoschema.ErrColumnExists) } +func upgradeToVer28(s Session) { + doReentrantDDL(s, CreateBindInfoTable) +} + // updateBootstrapVer updates bootstrap version variable in mysql.TiDB table. func updateBootstrapVer(s Session) { // Update bootstrap version. @@ -783,6 +805,8 @@ func doDDLWorks(s Session) { mustExecute(s, CreateRoleEdgesTable) // Create default_roles table. mustExecute(s, CreateDefaultRolesTable) + // Create bind_info table. + mustExecute(s, CreateBindInfoTable) } // doDMLWorks executes DML statements in bootstrap stage. diff --git a/session/session.go b/session/session.go index e897c41b41875..22734dbe9af58 100644 --- a/session/session.go +++ b/session/session.go @@ -1320,6 +1320,14 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { if err != nil { return nil, errors.Trace(err) } + se2, err := createSession(store) + if err != nil { + return nil, errors.Trace(err) + } + err = dom.LoadBindInfoLoop(se2, se2.parser) + if err != nil { + return nil, errors.Trace(err) + } // get global system variable tidb_log_bin from mysql.GLOBAL_VARIABLES tidbLogBin, err := se1.GetGlobalSysVar(variable.TiDBLogBin) @@ -1419,7 +1427,7 @@ func createSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, er const ( notBootstrapped = 0 - currentBootstrapVersion = 27 + currentBootstrapVersion = 28 ) func getStoreBootstrapVersion(store kv.Storage) int64 {