Skip to content

Commit

Permalink
*: create sql bind table during bootstrap (#9008)
Browse files Browse the repository at this point in the history
  • Loading branch information
iamzhoug37 authored and zz-jason committed Mar 15, 2019
1 parent 4454461 commit 0618339
Show file tree
Hide file tree
Showing 5 changed files with 403 additions and 2 deletions.
136 changes: 136 additions & 0 deletions bindinfo/bind_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package bindinfo_test

import (
"flag"
"fmt"
"os"
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/parser"
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/mocktikv"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
)

func TestT(t *testing.T) {
CustomVerboseFlag = true
logLevel := os.Getenv("log_level")
logutil.InitLogger(logutil.NewLogConfig(logLevel, logutil.DefaultLogFormat, "", logutil.EmptyFileLogConfig, false))
autoid.SetStep(5000)
TestingT(t)
}

var _ = Suite(&testSuite{})

type testSuite struct {
cluster *mocktikv.Cluster
mvccStore mocktikv.MVCCStore
store kv.Storage
domain *domain.Domain
*parser.Parser
ctx *mock.Context
}

var mockTikv = flag.Bool("mockTikv", true, "use mock tikv store in bind test")

func (s *testSuite) SetUpSuite(c *C) {
testleak.BeforeTest()
s.Parser = parser.New()
flag.Lookup("mockTikv")
useMockTikv := *mockTikv
if useMockTikv {
s.cluster = mocktikv.NewCluster()
mocktikv.BootstrapWithSingleStore(s.cluster)
s.mvccStore = mocktikv.MustNewMVCCStore()
store, err := mockstore.NewMockTikvStore(
mockstore.WithCluster(s.cluster),
mockstore.WithMVCCStore(s.mvccStore),
)
c.Assert(err, IsNil)
s.store = store
session.SetSchemaLease(0)
session.SetStatsLease(0)
}
d, err := session.BootstrapSession(s.store)
c.Assert(err, IsNil)
d.SetStatsUpdating(true)
s.domain = d
}

func (s *testSuite) TearDownSuite(c *C) {
s.domain.Close()
s.store.Close()
testleak.AfterTest(c)()
}

func (s *testSuite) TearDownTest(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
r := tk.MustQuery("show tables")
for _, tb := range r.Rows() {
tableName := tb[0]
tk.MustExec(fmt.Sprintf("drop table %v", tableName))
}
}

func (s *testSuite) cleanBindingEnv(tk *testkit.TestKit) {
tk.MustExec("drop table if exists mysql.bind_info")
tk.MustExec(session.CreateBindInfoTable)
}

func (s *testSuite) TestBindParse(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("create table t(i int)")
tk.MustExec("create index index_t on t(i)")

originSQL := "select * from t"
bindSQL := "select * from t use index(index_t)"
defaultDb := "test"
status := "using"
charset := "utf8mb4"
collation := "utf8mb4_bin"
sql := fmt.Sprintf(`INSERT INTO mysql.bind_info(original_sql,bind_sql,default_db,status,create_time,update_time,charset,collation) VALUES ('%s', '%s', '%s', '%s', NOW(), NOW(),'%s', '%s')`,
originSQL, bindSQL, defaultDb, status, charset, collation)
tk.MustExec(sql)
bindHandle := bindinfo.NewHandle()
bindCacheUpdater := bindinfo.NewBindCacheUpdater(tk.Se, bindHandle, s.Parser)
err := bindCacheUpdater.Update(true)
c.Check(err, IsNil)
c.Check(len(bindHandle.Get()), Equals, 1)

hash := parser.DigestHash("select * from t")
bindData := bindHandle.Get()[hash]
c.Check(bindData, NotNil)
c.Check(len(bindData), Equals, 1)
c.Check(bindData[0].OriginalSQL, Equals, "select * from t")
c.Check(bindData[0].BindSQL, Equals, "select * from t use index(index_t)")
c.Check(bindData[0].Db, Equals, "test")
c.Check(bindData[0].Status, Equals, "using")
c.Check(bindData[0].Charset, Equals, "utf8mb4")
c.Check(bindData[0].Collation, Equals, "utf8mb4_bin")
c.Check(bindData[0].CreateTime, NotNil)
c.Check(bindData[0].UpdateTime, NotNil)
}
193 changes: 193 additions & 0 deletions bindinfo/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package bindinfo

import (
"context"
"fmt"
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
)

const (
using = "using"
deleted = "deleted"
)

// bindMeta stores the basic bind info and bindSql astNode.
type bindMeta struct {
*bindRecord
ast ast.StmtNode //ast will be used to do query sql bind check
}

// cache is a k-v map, key is original sql, value is a slice of bindMeta.
type cache map[string][]*bindMeta

// Handle holds an atomic cache.
type Handle struct {
atomic.Value
}

// BindCacheUpdater is used to update the global cache.
// BindCacheUpdater will update the bind cache per 3 seconds in domain
// gorountine loop. When the tidb server first startup, the updater will load
// all bind info into memory; then load diff bind info per 3 second.
type BindCacheUpdater struct {
ctx sessionctx.Context

parser *parser.Parser
lastUpdateTime types.Time
globalHandle *Handle
}

type bindRecord struct {
OriginalSQL string
BindSQL string
Db string
// Status represents the status of the binding. It can only be one of the following values:
// 1. deleted: bindRecord is deleted, can not be used anymore.
// 2. using: bindRecord is in the normal active mode.
Status string
CreateTime types.Time
UpdateTime types.Time
Charset string
Collation string
}

// NewBindCacheUpdater creates a new BindCacheUpdater.
func NewBindCacheUpdater(ctx sessionctx.Context, handle *Handle, parser *parser.Parser) *BindCacheUpdater {
return &BindCacheUpdater{
ctx: ctx,
parser: parser,
globalHandle: handle,
}
}

// NewHandle creates a Handle with a cache.
func NewHandle() *Handle {
handle := &Handle{}
return handle
}

// Get gets cache from a Handle.
func (h *Handle) Get() cache {
bc := h.Load()
if bc != nil {
return bc.(map[string][]*bindMeta)
}
return make(map[string][]*bindMeta)
}

// LoadDiff is used to load new bind info to cache bc.
func (bindCacheUpdater *BindCacheUpdater) loadDiff(sql string, bc cache) error {
recordSets, err := bindCacheUpdater.ctx.(sqlexec.SQLExecutor).Execute(context.Background(), sql)
if err != nil {
return errors.Trace(err)
}

rs := recordSets[0]
defer terror.Call(rs.Close)
chkBatch := rs.NewRecordBatch()
for {
err = rs.Next(context.TODO(), chkBatch)
if err != nil || chkBatch.NumRows() == 0 {
return errors.Trace(err)
}

it := chunk.NewIterator4Chunk(chkBatch.Chunk)
for row := it.Begin(); row != it.End(); row = it.Next() {
record := newBindMeta(row)
err = bc.appendNode(record, bindCacheUpdater.parser)
if err != nil {
return err
}
if record.UpdateTime.Compare(bindCacheUpdater.lastUpdateTime) == 1 {
bindCacheUpdater.lastUpdateTime = record.UpdateTime
}
}
}
}

// Update updates the BindCacheUpdater's cache.
// The `fullLoad` is true only when tidb first startup, otherwise it is false.
func (bindCacheUpdater *BindCacheUpdater) Update(fullLoad bool) (err error) {
var sql string
bc := bindCacheUpdater.globalHandle.Get()
newBc := make(map[string][]*bindMeta, len(bc))
for hash, bindDataArr := range bc {
newBc[hash] = append(newBc[hash], bindDataArr...)
}

if fullLoad {
sql = "select original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation from mysql.bind_info"
} else {
sql = fmt.Sprintf("select original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation from mysql.bind_info where update_time > \"%s\"", bindCacheUpdater.lastUpdateTime.String())
}
err = bindCacheUpdater.loadDiff(sql, newBc)
if err != nil {
return errors.Trace(err)
}

bindCacheUpdater.globalHandle.Store(newBc)
return nil
}

func newBindMeta(row chunk.Row) *bindRecord {
return &bindRecord{
OriginalSQL: row.GetString(0),
BindSQL: row.GetString(1),
Db: row.GetString(2),
Status: row.GetString(3),
CreateTime: row.GetTime(4),
UpdateTime: row.GetTime(5),
Charset: row.GetString(6),
Collation: row.GetString(7),
}
}

func (b cache) appendNode(newBindRecord *bindRecord, sparser *parser.Parser) error {
hash := parser.DigestHash(newBindRecord.OriginalSQL)
if bindArr, ok := b[hash]; ok {
for idx, v := range bindArr {
if v.OriginalSQL == newBindRecord.OriginalSQL && v.Db == newBindRecord.Db {
b[hash] = append(b[hash][:idx], b[hash][idx+1:]...)
if len(b[hash]) == 0 {
delete(b, hash)
}
break
}
}
}
if newBindRecord.Status == deleted {
return nil
}
stmtNodes, _, err := sparser.Parse(newBindRecord.BindSQL, newBindRecord.Charset, newBindRecord.Collation)
if err != nil {
return errors.Trace(err)
}
newNode := &bindMeta{
bindRecord: newBindRecord,
ast: stmtNodes[0],
}
b[hash] = append(b[hash], newNode)
return nil
}
Loading

0 comments on commit 0618339

Please sign in to comment.