diff --git a/errno/errcode.go b/errno/errcode.go index 6e13223d129aa..638e1b889a902 100644 --- a/errno/errcode.go +++ b/errno/errcode.go @@ -1048,6 +1048,8 @@ const ( ErrDDLReorgElementNotExist = 8235 ErrPlacementPolicyCheck = 8236 ErrInvalidAttributesSpec = 8237 + ErrPlacementPolicyExists = 8238 + ErrPlacementPolicyNotExists = 8239 // TiKV/PD/TiFlash errors. ErrPDServerTimeout = 9001 diff --git a/errno/errname.go b/errno/errname.go index b52e528eb146e..025caa8f0e4dd 100644 --- a/errno/errname.go +++ b/errno/errname.go @@ -1050,11 +1050,13 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{ ErrPartitionStatsMissing: mysql.Message("Build table: %s global-level stats failed due to missing partition-level stats", nil), ErrNotSupportedWithSem: mysql.Message("Feature '%s' is not supported when security enhanced mode is enabled", nil), - ErrInvalidPlacementSpec: mysql.Message("Invalid placement policy '%s': %s", nil), - ErrPlacementPolicyCheck: mysql.Message("Placement policy didn't meet the constraint, reason: %s", nil), - ErrMultiStatementDisabled: mysql.Message("client has multi-statement capability disabled. Run SET GLOBAL tidb_multi_statement_mode='ON' after you understand the security risk", nil), - ErrAsOf: mysql.Message("invalid as of timestamp: %s", nil), - ErrInvalidAttributesSpec: mysql.Message("Invalid attributes '%s': %s", nil), + ErrInvalidPlacementSpec: mysql.Message("Invalid placement policy '%s': %s", nil), + ErrPlacementPolicyCheck: mysql.Message("Placement policy didn't meet the constraint, reason: %s", nil), + ErrMultiStatementDisabled: mysql.Message("client has multi-statement capability disabled. Run SET GLOBAL tidb_multi_statement_mode='ON' after you understand the security risk", nil), + ErrAsOf: mysql.Message("invalid as of timestamp: %s", nil), + ErrInvalidAttributesSpec: mysql.Message("Invalid attributes '%s': %s", nil), + ErrPlacementPolicyExists: mysql.Message("Can't create placement policy '%-.192s'; policy exists", nil), + ErrPlacementPolicyNotExists: mysql.Message("Unknown placement policy '%-.192s'", nil), // TiKV/PD errors. ErrPDServerTimeout: mysql.Message("PD server timeout", nil), diff --git a/errors.toml b/errors.toml index cd2bdd0b89abc..a6143d74bcd2a 100644 --- a/errors.toml +++ b/errors.toml @@ -1061,6 +1061,16 @@ error = ''' DDL reorg element does not exist ''' +["meta:8238"] +error = ''' +Can't create placement policy '%-.192s'; policy exists +''' + +["meta:8239"] +error = ''' +Unknown placement policy '%-.192s' +''' + ["planner:1044"] error = ''' Access denied for user '%-.48s'@'%-.64s' to database '%-.192s' diff --git a/infoschema/builder.go b/infoschema/builder.go index 88847ba544c5a..d86ef74e3ddce 100644 --- a/infoschema/builder.go +++ b/infoschema/builder.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/util/domainutil" + "github.com/pingcap/tidb/util/placementpolicy" ) // Builder builds a new InfoSchema. @@ -485,6 +486,7 @@ func (b *Builder) InitWithOldInfoSchema(oldSchema InfoSchema) *Builder { b.is.schemaMetaVersion = oldIS.schemaMetaVersion b.copySchemasMap(oldIS) b.copyBundlesMap(oldIS) + b.copyPoliciesMap(oldIS) copy(b.is.sortedTablesBuckets, oldIS.sortedTablesBuckets) return b } @@ -502,6 +504,15 @@ func (b *Builder) copyBundlesMap(oldIS *infoSchema) { } } +func (b *Builder) copyPoliciesMap(oldIS *infoSchema) { + is := b.is + is.policyMutex.Lock() + defer is.policyMutex.Unlock() + for _, v := range oldIS.PlacementPolicies() { + is.policyMap[v.Name.L] = v + } +} + // copySchemaTables creates a new schemaTables instance when a table in the database has changed. // It also does modifications on the new one because old schemaTables must be read-only. // Note: please make sure the dbName is in lowercase. @@ -589,6 +600,7 @@ func NewBuilder(store kv.Storage) *Builder { store: store, is: &infoSchema{ schemaMap: map[string]*schemaTables{}, + policyMap: map[string]*placementpolicy.PolicyInfo{}, ruleBundleMap: map[string]*placement.Bundle{}, sortedTablesBuckets: make([]sortedTables, bucketCount), }, diff --git a/infoschema/infoschema.go b/infoschema/infoschema.go index e9131bff5d822..b13d0ede1fdbe 100644 --- a/infoschema/infoschema.go +++ b/infoschema/infoschema.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/placementpolicy" ) // InfoSchema is the interface used to retrieve the schema information. @@ -94,6 +95,10 @@ type infoSchema struct { ruleBundleMutex sync.RWMutex ruleBundleMap map[string]*placement.Bundle + // policyMap stores all placement policies. + policyMutex sync.RWMutex + policyMap map[string]*placementpolicy.PolicyInfo + schemaMap map[string]*schemaTables // sortedTablesBuckets is a slice of sortedTables, a table's bucket index is (tableID % bucketCount). @@ -107,6 +112,7 @@ type infoSchema struct { func MockInfoSchema(tbList []*model.TableInfo) InfoSchema { result := &infoSchema{} result.schemaMap = make(map[string]*schemaTables) + result.policyMap = make(map[string]*placementpolicy.PolicyInfo) result.ruleBundleMap = make(map[string]*placement.Bundle) result.sortedTablesBuckets = make([]sortedTables, bucketCount) dbInfo := &model.DBInfo{ID: 0, Name: model.NewCIStr("test"), Tables: tbList} @@ -131,6 +137,7 @@ func MockInfoSchema(tbList []*model.TableInfo) InfoSchema { func MockInfoSchemaWithSchemaVer(tbList []*model.TableInfo, schemaVer int64) InfoSchema { result := &infoSchema{} result.schemaMap = make(map[string]*schemaTables) + result.policyMap = make(map[string]*placementpolicy.PolicyInfo) result.ruleBundleMap = make(map[string]*placement.Bundle) result.sortedTablesBuckets = make([]sortedTables, bucketCount) dbInfo := &model.DBInfo{ID: 0, Name: model.NewCIStr("test"), Tables: tbList} @@ -347,6 +354,24 @@ func HasAutoIncrementColumn(tbInfo *model.TableInfo) (bool, string) { return false, "" } +// PolicyByName is used to find the policy. +func (is *infoSchema) PolicyByName(name string) (*placementpolicy.PolicyInfo, bool) { + is.policyMutex.RLock() + defer is.policyMutex.RUnlock() + t, r := is.policyMap[name] + return t, r +} + +func (is *infoSchema) PlacementPolicies() []*placementpolicy.PolicyInfo { + is.policyMutex.RLock() + defer is.policyMutex.RUnlock() + policies := make([]*placementpolicy.PolicyInfo, 0, len(is.policyMap)) + for _, policy := range is.policyMap { + policies = append(policies, policy) + } + return policies +} + func (is *infoSchema) BundleByName(name string) (*placement.Bundle, bool) { is.ruleBundleMutex.RLock() defer is.ruleBundleMutex.RUnlock() diff --git a/meta/meta.go b/meta/meta.go index e222cc126ddd3..e4872e7d08740 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -35,11 +35,13 @@ import ( "github.com/pingcap/tidb/structure" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/logutil" + placement "github.com/pingcap/tidb/util/placementpolicy" "go.uber.org/zap" ) var ( globalIDMutex sync.Mutex + policyIDMutex sync.Mutex ) // Meta structure: @@ -70,6 +72,25 @@ var ( mRandomIDPrefix = "TARID" mBootstrapKey = []byte("BootstrapKey") mSchemaDiffPrefix = "Diff" + mPolicies = []byte("Policies") + mPolicyPrefix = "Policy" + mPolicyGlobalID = []byte("PolicyGlobalID") + mPolicyMagicByte = CurrentMagicByteVer +) + +const ( + // CurrentMagicByteVer is the current magic byte version, used for future meta compatibility. + CurrentMagicByteVer byte = 0x00 + // PolicyMagicByte handler + // 0x00 - 0x3F: Json Handler + // 0x40 - 0x7F: Reserved + // 0x80 - 0xBF: Reserved + // 0xC0 - 0xFF: Reserved + + // type means how to handle the serialized data. + typeUnknown int = 0 + typeJSON int = 1 + // todo: customized handler. ) var ( @@ -77,6 +98,10 @@ var ( ErrDBExists = dbterror.ClassMeta.NewStd(mysql.ErrDBCreateExists) // ErrDBNotExists is the error for db not exists. ErrDBNotExists = dbterror.ClassMeta.NewStd(mysql.ErrBadDB) + // ErrPolicyExists is the error for policy exists. + ErrPolicyExists = dbterror.ClassMeta.NewStd(errno.ErrPlacementPolicyExists) + // ErrPolicyNotExists is the error for policy not exists. + ErrPolicyNotExists = dbterror.ClassMeta.NewStd(errno.ErrPlacementPolicyNotExists) // ErrTableExists is the error for table exists. ErrTableExists = dbterror.ClassMeta.NewStd(mysql.ErrTableExists) // ErrTableNotExists is the error for table not exists. @@ -145,6 +170,15 @@ func (m *Meta) GetGlobalID() (int64, error) { return m.txn.GetInt64(mNextGlobalIDKey) } +// GetPolicyID gets current policy global id. +func (m *Meta) GetPolicyID() (int64, error) { + return m.txn.GetInt64(mPolicyGlobalID) +} + +func (m *Meta) policyKey(policyID int64) []byte { + return []byte(fmt.Sprintf("%s:%d", mPolicyPrefix, policyID)) +} + func (m *Meta) dbKey(dbID int64) []byte { return []byte(fmt.Sprintf("%s:%d", mDBPrefix, dbID)) } @@ -268,6 +302,22 @@ func (m *Meta) GenSchemaVersion() (int64, error) { return m.txn.Inc(mSchemaVersionKey, 1) } +func (m *Meta) checkPolicyExists(policyKey []byte) error { + v, err := m.txn.HGet(mPolicies, policyKey) + if err == nil && v == nil { + err = ErrPolicyNotExists.GenWithStack("policy doesn't exist") + } + return errors.Trace(err) +} + +func (m *Meta) checkPolicyNotExists(policyKey []byte) error { + v, err := m.txn.HGet(mPolicies, policyKey) + if err == nil && v != nil { + err = ErrPolicyExists.GenWithStack("policy already exists") + } + return errors.Trace(err) +} + func (m *Meta) checkDBExists(dbKey []byte) error { v, err := m.txn.HGet(mDBs, dbKey) if err == nil && v == nil { @@ -300,6 +350,46 @@ func (m *Meta) checkTableNotExists(dbKey []byte, tableKey []byte) error { return errors.Trace(err) } +// CreatePolicy creates a policy. +func (m *Meta) CreatePolicy(policy *placement.PolicyInfo) error { + if policy.ID != 0 { + policyKey := m.policyKey(policy.ID) + if err := m.checkPolicyNotExists(policyKey); err != nil { + return errors.Trace(err) + } + } else { + // Autofill the policy ID. + policyIDMutex.Lock() + genID, err := m.txn.Inc(mPolicyGlobalID, 1) + if err != nil { + return errors.Trace(err) + } + policyIDMutex.Unlock() + policy.ID = genID + } + policyKey := m.policyKey(policy.ID) + data, err := json.Marshal(policy) + if err != nil { + return errors.Trace(err) + } + return m.txn.HSet(mPolicies, policyKey, attachMagicByte(data)) +} + +// UpdatePolicy updates a policy. +func (m *Meta) UpdatePolicy(policy *placement.PolicyInfo) error { + policyKey := m.policyKey(policy.ID) + + if err := m.checkPolicyExists(policyKey); err != nil { + return errors.Trace(err) + } + + data, err := json.Marshal(policy) + if err != nil { + return errors.Trace(err) + } + return m.txn.HSet(mPolicies, policyKey, attachMagicByte(data)) +} + // CreateDatabase creates a database with db info. func (m *Meta) CreateDatabase(dbInfo *model.DBInfo) error { dbKey := m.dbKey(dbInfo.ID) @@ -552,6 +642,73 @@ func (m *Meta) GetDatabase(dbID int64) (*model.DBInfo, error) { return dbInfo, errors.Trace(err) } +// ListPolicies shows all policies. +func (m *Meta) ListPolicies() ([]*placement.PolicyInfo, error) { + res, err := m.txn.HGetAll(mPolicies) + if err != nil { + return nil, errors.Trace(err) + } + + policies := make([]*placement.PolicyInfo, 0, len(res)) + for _, r := range res { + value, err := detachMagicByte(r.Value) + if err != nil { + return nil, errors.Trace(err) + } + policy := &placement.PolicyInfo{} + err = json.Unmarshal(value, policy) + if err != nil { + return nil, errors.Trace(err) + } + policies = append(policies, policy) + } + return policies, nil +} + +// GetPolicy gets the database value with ID. +func (m *Meta) GetPolicy(policyID int64) (*placement.PolicyInfo, error) { + policyKey := m.policyKey(policyID) + value, err := m.txn.HGet(mPolicies, policyKey) + if err != nil || value == nil { + return nil, errors.Trace(err) + } + value, err = detachMagicByte(value) + if err != nil { + return nil, errors.Trace(err) + } + + policy := &placement.PolicyInfo{} + err = json.Unmarshal(value, policy) + return policy, errors.Trace(err) +} + +func attachMagicByte(data []byte) []byte { + data = append(data, 0) + copy(data[1:], data) + data[0] = mPolicyMagicByte + return data +} + +func detachMagicByte(value []byte) ([]byte, error) { + magic, data := value[:1], value[1:] + switch whichMagicType(magic[0]) { + case typeJSON: + if magic[0] != CurrentMagicByteVer { + return nil, errors.New("incompatible magic type handling module") + } + return data, nil + default: + return nil, errors.New("unknown magic type handling module") + } +} + +func whichMagicType(b byte) int { + if b <= 0x3F { + return typeJSON + } + return typeUnknown +} + // GetTable gets the table value in database with tableID. func (m *Meta) GetTable(dbID int64, tableID int64) (*model.TableInfo, error) { // Check if db exists. diff --git a/meta/meta_test.go b/meta/meta_test.go index 15cf516eeaa81..d5512a1145ced 100644 --- a/meta/meta_test.go +++ b/meta/meta_test.go @@ -29,9 +29,80 @@ import ( "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/util/placementpolicy" "github.com/stretchr/testify/require" ) +func TestPlacementPolicy(t *testing.T) { + t.Parallel() + + store, err := mockstore.NewMockStore() + require.NoError(t, err) + + defer func() { + err := store.Close() + require.NoError(t, err) + }() + + txn, err := store.Begin() + require.NoError(t, err) + + // test the independent policy ID allocation. + m := meta.NewMeta(txn) + + // test the meta storage of placemnt policy. + policy := &placementpolicy.PolicyInfo{ + Name: model.NewCIStr("aa"), + PrimaryRegion: "my primary", + Regions: "my regions", + Learners: 1, + Followers: 2, + Voters: 3, + Schedule: "even", + Constraints: "+disk=ssd", + LearnerConstraints: "+zone=shanghai", + } + err = m.CreatePolicy(policy) + require.NoError(t, err) + require.Equal(t, policy.ID, int64(1)) + + err = m.CreatePolicy(policy) + require.NotNil(t, err) + require.True(t, meta.ErrPolicyExists.Equal(err)) + + val, err := m.GetPolicy(1) + require.NoError(t, err) + require.Equal(t, policy, val) + + // mock updating the placement policy. + policy.Name = model.NewCIStr("bb") + policy.LearnerConstraints = "+zone=nanjing" + err = m.UpdatePolicy(policy) + require.NoError(t, err) + + val, err = m.GetPolicy(1) + require.NoError(t, err) + require.Equal(t, policy, val) + + ps, err := m.ListPolicies() + require.NoError(t, err) + require.Equal(t, []*placementpolicy.PolicyInfo{policy}, ps) + + err = txn.Commit(context.Background()) + require.NoError(t, err) + + // fetch the stored value after committing. + txn, err = store.Begin() + require.NoError(t, err) + + m = meta.NewMeta(txn) + val, err = m.GetPolicy(1) + require.NoError(t, err) + require.Equal(t, policy, val) + err = txn.Commit(context.Background()) + require.NoError(t, err) +} + func TestMeta(t *testing.T) { t.Parallel() diff --git a/util/placementpolicy/policy.go b/util/placementpolicy/policy.go new file mode 100644 index 0000000000000..4768e76ed43aa --- /dev/null +++ b/util/placementpolicy/policy.go @@ -0,0 +1,36 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package placementpolicy + +import ( + "github.com/pingcap/parser/model" +) + +// PolicyInfo is the struct to store the placement policy. +type PolicyInfo struct { + ID int64 `json:"id"` + Name model.CIStr `json:"name"` + PrimaryRegion string `json:"primary_region"` + Regions string `json:"regions"` + Learners uint64 `json:"learners"` + Followers uint64 `json:"followers"` + Voters uint64 `json:"voters"` + Schedule string `json:"schedule"` + Constraints string `json:"constraints"` + LearnerConstraints string `json:"learner_constraints"` + FollowerConstraints string `json:"follower_constraints"` + VoterConstraints string `json:"voter_constraints"` + State model.SchemaState `json:"state"` +}