Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

meta: support storing placement policy into meta #27251

Merged
merged 36 commits into from
Aug 23, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c5efcdd
support storing placement policy in meta
AilinKid Aug 16, 2021
6ebb9f7
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 16, 2021
f982dcb
solve the import problem
AilinKid Aug 16, 2021
02db37d
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 17, 2021
fa317e4
address commnet
AilinKid Aug 17, 2021
a434c68
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 17, 2021
1ded57e
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 17, 2021
91f1d79
add magic number for policy
AilinKid Aug 17, 2021
643499d
.
AilinKid Aug 17, 2021
e9c33de
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 17, 2021
842a6b6
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 17, 2021
ff7dcf0
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 18, 2021
ca1312e
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 18, 2021
bd19369
address comment
AilinKid Aug 18, 2021
3896af7
address comment
AilinKid Aug 18, 2021
70aab7a
address comment
AilinKid Aug 18, 2021
bfcbcf4
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 18, 2021
2e64f19
error
AilinKid Aug 18, 2021
d96affa
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 18, 2021
9c931eb
.
AilinKid Aug 18, 2021
6c977aa
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 18, 2021
ab6720b
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 19, 2021
bcb66cb
address comment
AilinKid Aug 19, 2021
199c22a
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 19, 2021
ffb8605
address comment
AilinKid Aug 20, 2021
1815aca
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 20, 2021
e23ea18
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 20, 2021
3be7de8
address comment
AilinKid Aug 20, 2021
dae2b36
address comment
AilinKid Aug 20, 2021
2c7e5a3
address comment
AilinKid Aug 20, 2021
5dd8798
fix test
AilinKid Aug 20, 2021
7d5a966
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 20, 2021
92232f8
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 21, 2021
f1f3924
change typeUnknow to 0
AilinKid Aug 21, 2021
9df17d0
add magic byte compatibility check
AilinKid Aug 23, 2021
d318359
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions ddl/placement/policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package placement

import "github.com/pingcap/parser/model"

// Policy is the struct to store the placement policy.
type Policy struct {
ID int64 `json:"id"`
Name model.CIStr `json:"name"`
PrimaryRegion string `json:"primary_region"`
Regions string `json:"regions"`
Leaders int64 `json:"leaders"`
Followers int64 `json:"followers"`
Voters int64 `json:"voters"`
Schedule string `json:"schedule"`
Constraints Constraints `json:"constraints"`
LeaderConstraints Constraints `json:"leader_constraints"`
FollowerConstraints Constraints `json:"follower_constraints"`
VoterConstraints Constraints `json:"voter_constraints"`
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
}
2 changes: 2 additions & 0 deletions errno/errcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,8 @@ const (
ErrDDLReorgElementNotExist = 8235
ErrPlacementPolicyCheck = 8236
ErrInvalidAttributesSpec = 8237
ErrPlacementPolicyExists = 8238
ErrPlacementPolicyNotExists = 8239

// TiKV/PD/TiFlash errors.
ErrPDServerTimeout = 9001
Expand Down
12 changes: 7 additions & 5 deletions errno/errname.go
Original file line number Diff line number Diff line change
Expand Up @@ -1049,11 +1049,13 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{
ErrPartitionStatsMissing: mysql.Message("Build table: %s global-level stats failed due to missing partition-level stats", nil),
ErrNotSupportedWithSem: mysql.Message("Feature '%s' is not supported when security enhanced mode is enabled", nil),

ErrInvalidPlacementSpec: mysql.Message("Invalid placement policy '%s': %s", nil),
ErrPlacementPolicyCheck: mysql.Message("Placement policy didn't meet the constraint, reason: %s", nil),
ErrMultiStatementDisabled: mysql.Message("client has multi-statement capability disabled. Run SET GLOBAL tidb_multi_statement_mode='ON' after you understand the security risk", nil),
ErrAsOf: mysql.Message("invalid as of timestamp: %s", nil),
ErrInvalidAttributesSpec: mysql.Message("Invalid attributes '%s': %s", nil),
ErrInvalidPlacementSpec: mysql.Message("Invalid placement policy '%s': %s", nil),
ErrPlacementPolicyCheck: mysql.Message("Placement policy didn't meet the constraint, reason: %s", nil),
ErrMultiStatementDisabled: mysql.Message("client has multi-statement capability disabled. Run SET GLOBAL tidb_multi_statement_mode='ON' after you understand the security risk", nil),
ErrAsOf: mysql.Message("invalid as of timestamp: %s", nil),
ErrInvalidAttributesSpec: mysql.Message("Invalid attributes '%s': %s", nil),
ErrPlacementPolicyExists: mysql.Message("Can't create placement policy '%-.192s'; policy exists", nil),
ErrPlacementPolicyNotExists: mysql.Message("Unknown placement policy '%-.192s'", nil),

// TiKV/PD errors.
ErrPDServerTimeout: mysql.Message("PD server timeout", nil),
Expand Down
9 changes: 9 additions & 0 deletions infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ func (b *Builder) InitWithOldInfoSchema(oldSchema InfoSchema) *Builder {
b.is.schemaMetaVersion = oldIS.schemaMetaVersion
b.copySchemasMap(oldIS)
b.copyBundlesMap(oldIS)
b.copyPoliciesMap(oldIS)
copy(b.is.sortedTablesBuckets, oldIS.sortedTablesBuckets)
return b
}
Expand All @@ -501,6 +502,13 @@ func (b *Builder) copyBundlesMap(oldIS *infoSchema) {
}
}

func (b *Builder) copyPoliciesMap(oldIS *infoSchema) {
is := b.is
for _, v := range oldIS.PlacementPolicies() {
is.SetPolicy(v)
}
}

// copySchemaTables creates a new schemaTables instance when a table in the database has changed.
// It also does modifications on the new one because old schemaTables must be read-only.
// Note: please make sure the dbName is in lowercase.
Expand Down Expand Up @@ -588,6 +596,7 @@ func NewBuilder(store kv.Storage) *Builder {
store: store,
is: &infoSchema{
schemaMap: map[string]*schemaTables{},
policyMap: map[string]*placement.Policy{},
ruleBundleMap: map[string]*placement.Bundle{},
sortedTablesBuckets: make([]sortedTables, bucketCount),
},
Expand Down
38 changes: 38 additions & 0 deletions infoschema/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ type infoSchema struct {
ruleBundleMutex sync.RWMutex
ruleBundleMap map[string]*placement.Bundle

// policyMap stores all placement policies.
policyMutex sync.RWMutex
policyMap map[string]*placement.Policy

schemaMap map[string]*schemaTables

// sortedTablesBuckets is a slice of sortedTables, a table's bucket index is (tableID % bucketCount).
Expand All @@ -106,6 +110,7 @@ type infoSchema struct {
func MockInfoSchema(tbList []*model.TableInfo) InfoSchema {
result := &infoSchema{}
result.schemaMap = make(map[string]*schemaTables)
result.policyMap = make(map[string]*placement.Policy)
result.ruleBundleMap = make(map[string]*placement.Bundle)
result.sortedTablesBuckets = make([]sortedTables, bucketCount)
dbInfo := &model.DBInfo{ID: 0, Name: model.NewCIStr("test"), Tables: tbList}
Expand All @@ -130,6 +135,7 @@ func MockInfoSchema(tbList []*model.TableInfo) InfoSchema {
func MockInfoSchemaWithSchemaVer(tbList []*model.TableInfo, schemaVer int64) InfoSchema {
result := &infoSchema{}
result.schemaMap = make(map[string]*schemaTables)
result.policyMap = make(map[string]*placement.Policy)
result.ruleBundleMap = make(map[string]*placement.Bundle)
result.sortedTablesBuckets = make([]sortedTables, bucketCount)
dbInfo := &model.DBInfo{ID: 0, Name: model.NewCIStr("test"), Tables: tbList}
Expand Down Expand Up @@ -346,6 +352,38 @@ func HasAutoIncrementColumn(tbInfo *model.TableInfo) (bool, string) {
return false, ""
}

// PolicyByName is used to find the policy.
func (is *infoSchema) PolicyByName(name string) (*placement.Policy, bool) {
is.policyMutex.RLock()
defer is.policyMutex.RUnlock()
t, r := is.policyMap[name]
return t, r
}

// SetPolicy is used to set the policy.
func (is *infoSchema) SetPolicy(policy *placement.Policy) {
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
is.policyMutex.Lock()
defer is.policyMutex.Unlock()
is.policyMap[policy.Name.L] = policy
}

// DeletePolicy is used to delete the policy.
func (is *infoSchema) DeletePolicy(name string) {
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
is.policyMutex.Lock()
defer is.policyMutex.Unlock()
delete(is.policyMap, name)
}

func (is *infoSchema) PlacementPolicies() []*placement.Policy {
is.policyMutex.RLock()
defer is.policyMutex.RUnlock()
policies := make([]*placement.Policy, 0, len(is.policyMap))
for _, policy := range is.policyMap {
policies = append(policies, policy)
}
return policies
}

func (is *infoSchema) BundleByName(name string) (*placement.Bundle, bool) {
is.ruleBundleMutex.RLock()
defer is.ruleBundleMutex.RUnlock()
Expand Down
99 changes: 99 additions & 0 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
Expand All @@ -39,6 +40,7 @@ import (

var (
globalIDMutex sync.Mutex
policyIDMutex sync.Mutex
)

// Meta structure:
Expand Down Expand Up @@ -69,13 +71,20 @@ var (
mRandomIDPrefix = "TARID"
mBootstrapKey = []byte("BootstrapKey")
mSchemaDiffPrefix = "Diff"
mPolicies = []byte("Policies")
mPolicyPrefix = "Policy"
mPolicyGlobalID = []byte("PolicyGlobalID")
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
)

var (
// ErrDBExists is the error for db exists.
ErrDBExists = dbterror.ClassMeta.NewStd(mysql.ErrDBCreateExists)
// ErrDBNotExists is the error for db not exists.
ErrDBNotExists = dbterror.ClassMeta.NewStd(mysql.ErrBadDB)
// ErrPolicyExists is the error for policy exists.
ErrPolicyExists = dbterror.ClassMeta.NewStd(errno.ErrPlacementPolicyExists)
// ErrPolicyNotExists is the error for policy not exists.
ErrPolicyNotExists = dbterror.ClassMeta.NewStd(errno.ErrPlacementPolicyNotExists)
// ErrTableExists is the error for table exists.
ErrTableExists = dbterror.ClassMeta.NewStd(mysql.ErrTableExists)
// ErrTableNotExists is the error for table not exists.
Expand Down Expand Up @@ -144,6 +153,21 @@ func (m *Meta) GetGlobalID() (int64, error) {
return m.txn.GetInt64(mNextGlobalIDKey)
}

func (m *Meta) GenPolicyID() (int64, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe generate the id when creating the new policy? a.k.a auto incremental id.

Copy link
Contributor Author

@AilinKid AilinKid Aug 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AilinKid Actually, I mean "remove this API". GenPolicyID is used outside CreatePolicy, the behavior is not controlled by meta. One could just generate a random number other than numbers from GenPolicyID. Is there any advantage with a standalone ID generating method? Otherwise I prefer less/simple/opinionated API.

Not a necessary change request. rest code LGTM

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

policyIDMutex.Lock()
defer policyIDMutex.Unlock()

return m.txn.Inc(mPolicyGlobalID, 1)
}

func (m *Meta) GetPolicyID() (int64, error) {
return m.txn.GetInt64(mPolicyGlobalID)
}

func (m *Meta) policyKey(policyID int64) []byte {
return []byte(fmt.Sprintf("%s:%d", mPolicyPrefix, policyID))
}

func (m *Meta) dbKey(dbID int64) []byte {
return []byte(fmt.Sprintf("%s:%d", mDBPrefix, dbID))
}
Expand Down Expand Up @@ -267,6 +291,22 @@ func (m *Meta) GenSchemaVersion() (int64, error) {
return m.txn.Inc(mSchemaVersionKey, 1)
}

func (m *Meta) checkPolicyExists(policyKey []byte) error {
v, err := m.txn.HGet(mPolicies, policyKey)
if err == nil && v == nil {
err = ErrPolicyNotExists.GenWithStack("policy doesn't exist")
}
return errors.Trace(err)
}

func (m *Meta) checkPolicyNotExists(policyKey []byte) error {
v, err := m.txn.HGet(mPolicies, policyKey)
if err == nil && v != nil {
err = ErrPolicyExists.GenWithStack("policy already exists")
}
return errors.Trace(err)
}

func (m *Meta) checkDBExists(dbKey []byte) error {
v, err := m.txn.HGet(mDBs, dbKey)
if err == nil && v == nil {
Expand Down Expand Up @@ -299,6 +339,33 @@ func (m *Meta) checkTableNotExists(dbKey []byte, tableKey []byte) error {
return errors.Trace(err)
}

func (m *Meta) CreatePolicy(policy *placement.Policy) error {
policyKey := m.policyKey(policy.ID)

if err := m.checkPolicyNotExists(policyKey); err != nil {
return errors.Trace(err)
}
data, err := json.Marshal(policy)
xhebox marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Trace(err)
}
return m.txn.HSet(mPolicies, policyKey, data)
}

func (m *Meta) UpdatePolicy(policy *placement.Policy) error {
policyKey := m.policyKey(policy.ID)

if err := m.checkPolicyExists(policyKey); err != nil {
return errors.Trace(err)
}

data, err := json.Marshal(policy)
if err != nil {
return errors.Trace(err)
}
return m.txn.HSet(mPolicies, policyKey, data)
}

// CreateDatabase creates a database with db info.
func (m *Meta) CreateDatabase(dbInfo *model.DBInfo) error {
dbKey := m.dbKey(dbInfo.ID)
Expand Down Expand Up @@ -551,6 +618,38 @@ func (m *Meta) GetDatabase(dbID int64) (*model.DBInfo, error) {
return dbInfo, errors.Trace(err)
}

// ListPolicies shows all policies.
func (m *Meta) ListPolicies() ([]*placement.Policy, error) {
res, err := m.txn.HGetAll(mPolicies)
if err != nil {
return nil, errors.Trace(err)
}

policies := make([]*placement.Policy, 0, len(res))
for _, r := range res {
policy := &placement.Policy{}
err = json.Unmarshal(r.Value, policy)
if err != nil {
return nil, errors.Trace(err)
}
policies = append(policies, policy)
}
return policies, nil
}

// GetPolicy gets the database value with ID.
func (m *Meta) GetPolicy(policyID int64) (*placement.Policy, error) {
policyKey := m.policyKey(policyID)
value, err := m.txn.HGet(mPolicies, policyKey)
if err != nil || value == nil {
return nil, errors.Trace(err)
}

policy := &placement.Policy{}
err = json.Unmarshal(value, policy)
return policy, errors.Trace(err)
}

// GetTable gets the table value in database with tableID.
func (m *Meta) GetTable(dbID int64, tableID int64) (*model.TableInfo, error) {
// Check if db exists.
Expand Down
Loading