Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

meta: support storing placement policy into meta #27251

Merged
merged 36 commits into from
Aug 23, 2021
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c5efcdd
support storing placement policy in meta
AilinKid Aug 16, 2021
6ebb9f7
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 16, 2021
f982dcb
solve the import problem
AilinKid Aug 16, 2021
02db37d
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 17, 2021
fa317e4
address commnet
AilinKid Aug 17, 2021
a434c68
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 17, 2021
1ded57e
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 17, 2021
91f1d79
add magic number for policy
AilinKid Aug 17, 2021
643499d
.
AilinKid Aug 17, 2021
e9c33de
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 17, 2021
842a6b6
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 17, 2021
ff7dcf0
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 18, 2021
ca1312e
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 18, 2021
bd19369
address comment
AilinKid Aug 18, 2021
3896af7
address comment
AilinKid Aug 18, 2021
70aab7a
address comment
AilinKid Aug 18, 2021
bfcbcf4
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 18, 2021
2e64f19
error
AilinKid Aug 18, 2021
d96affa
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 18, 2021
9c931eb
.
AilinKid Aug 18, 2021
6c977aa
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 18, 2021
ab6720b
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 19, 2021
bcb66cb
address comment
AilinKid Aug 19, 2021
199c22a
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 19, 2021
ffb8605
address comment
AilinKid Aug 20, 2021
1815aca
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 20, 2021
e23ea18
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 20, 2021
3be7de8
address comment
AilinKid Aug 20, 2021
dae2b36
address comment
AilinKid Aug 20, 2021
2c7e5a3
address comment
AilinKid Aug 20, 2021
5dd8798
fix test
AilinKid Aug 20, 2021
7d5a966
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 20, 2021
92232f8
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 21, 2021
f1f3924
change typeUnknow to 0
AilinKid Aug 21, 2021
9df17d0
add magic byte compatibility check
AilinKid Aug 23, 2021
d318359
Merge branch 'master' into support-store-policy-in-meta
AilinKid Aug 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions errno/errcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,8 @@ const (
ErrDDLReorgElementNotExist = 8235
ErrPlacementPolicyCheck = 8236
ErrInvalidAttributesSpec = 8237
ErrPlacementPolicyExists = 8238
ErrPlacementPolicyNotExists = 8239

// TiKV/PD/TiFlash errors.
ErrPDServerTimeout = 9001
Expand Down
12 changes: 7 additions & 5 deletions errno/errname.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,11 +1050,13 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{
ErrPartitionStatsMissing: mysql.Message("Build table: %s global-level stats failed due to missing partition-level stats", nil),
ErrNotSupportedWithSem: mysql.Message("Feature '%s' is not supported when security enhanced mode is enabled", nil),

ErrInvalidPlacementSpec: mysql.Message("Invalid placement policy '%s': %s", nil),
ErrPlacementPolicyCheck: mysql.Message("Placement policy didn't meet the constraint, reason: %s", nil),
ErrMultiStatementDisabled: mysql.Message("client has multi-statement capability disabled. Run SET GLOBAL tidb_multi_statement_mode='ON' after you understand the security risk", nil),
ErrAsOf: mysql.Message("invalid as of timestamp: %s", nil),
ErrInvalidAttributesSpec: mysql.Message("Invalid attributes '%s': %s", nil),
ErrInvalidPlacementSpec: mysql.Message("Invalid placement policy '%s': %s", nil),
ErrPlacementPolicyCheck: mysql.Message("Placement policy didn't meet the constraint, reason: %s", nil),
ErrMultiStatementDisabled: mysql.Message("client has multi-statement capability disabled. Run SET GLOBAL tidb_multi_statement_mode='ON' after you understand the security risk", nil),
ErrAsOf: mysql.Message("invalid as of timestamp: %s", nil),
ErrInvalidAttributesSpec: mysql.Message("Invalid attributes '%s': %s", nil),
ErrPlacementPolicyExists: mysql.Message("Can't create placement policy '%-.192s'; policy exists", nil),
ErrPlacementPolicyNotExists: mysql.Message("Unknown placement policy '%-.192s'", nil),

// TiKV/PD errors.
ErrPDServerTimeout: mysql.Message("PD server timeout", nil),
Expand Down
10 changes: 10 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,16 @@ error = '''
DDL reorg element does not exist
'''

["meta:8238"]
error = '''
Can't create placement policy '%-.192s'; policy exists
'''

["meta:8239"]
error = '''
Unknown placement policy '%-.192s'
'''

["planner:1044"]
error = '''
Access denied for user '%-.48s'@'%-.64s' to database '%-.192s'
Expand Down
12 changes: 12 additions & 0 deletions infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/util/domainutil"
"github.com/pingcap/tidb/util/placementpolicy"
)

// Builder builds a new InfoSchema.
Expand Down Expand Up @@ -485,6 +486,7 @@ func (b *Builder) InitWithOldInfoSchema(oldSchema InfoSchema) *Builder {
b.is.schemaMetaVersion = oldIS.schemaMetaVersion
b.copySchemasMap(oldIS)
b.copyBundlesMap(oldIS)
b.copyPoliciesMap(oldIS)
copy(b.is.sortedTablesBuckets, oldIS.sortedTablesBuckets)
return b
}
Expand All @@ -502,6 +504,15 @@ func (b *Builder) copyBundlesMap(oldIS *infoSchema) {
}
}

func (b *Builder) copyPoliciesMap(oldIS *infoSchema) {
is := b.is
is.policyMutex.Lock()
defer is.policyMutex.Unlock()
for _, v := range oldIS.PlacementPolicies() {
is.policyMap[v.Name.L] = v
}
}

// copySchemaTables creates a new schemaTables instance when a table in the database has changed.
// It also does modifications on the new one because old schemaTables must be read-only.
// Note: please make sure the dbName is in lowercase.
Expand Down Expand Up @@ -589,6 +600,7 @@ func NewBuilder(store kv.Storage) *Builder {
store: store,
is: &infoSchema{
schemaMap: map[string]*schemaTables{},
policyMap: map[string]*placementpolicy.PolicyInfo{},
ruleBundleMap: map[string]*placement.Bundle{},
sortedTablesBuckets: make([]sortedTables, bucketCount),
},
Expand Down
25 changes: 25 additions & 0 deletions infoschema/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/placementpolicy"
)

// InfoSchema is the interface used to retrieve the schema information.
Expand Down Expand Up @@ -94,6 +95,10 @@ type infoSchema struct {
ruleBundleMutex sync.RWMutex
ruleBundleMap map[string]*placement.Bundle

// policyMap stores all placement policies.
policyMutex sync.RWMutex
policyMap map[string]*placementpolicy.PolicyInfo

schemaMap map[string]*schemaTables

// sortedTablesBuckets is a slice of sortedTables, a table's bucket index is (tableID % bucketCount).
Expand All @@ -107,6 +112,7 @@ type infoSchema struct {
func MockInfoSchema(tbList []*model.TableInfo) InfoSchema {
result := &infoSchema{}
result.schemaMap = make(map[string]*schemaTables)
result.policyMap = make(map[string]*placementpolicy.PolicyInfo)
result.ruleBundleMap = make(map[string]*placement.Bundle)
result.sortedTablesBuckets = make([]sortedTables, bucketCount)
dbInfo := &model.DBInfo{ID: 0, Name: model.NewCIStr("test"), Tables: tbList}
Expand All @@ -131,6 +137,7 @@ func MockInfoSchema(tbList []*model.TableInfo) InfoSchema {
func MockInfoSchemaWithSchemaVer(tbList []*model.TableInfo, schemaVer int64) InfoSchema {
result := &infoSchema{}
result.schemaMap = make(map[string]*schemaTables)
result.policyMap = make(map[string]*placementpolicy.PolicyInfo)
result.ruleBundleMap = make(map[string]*placement.Bundle)
result.sortedTablesBuckets = make([]sortedTables, bucketCount)
dbInfo := &model.DBInfo{ID: 0, Name: model.NewCIStr("test"), Tables: tbList}
Expand Down Expand Up @@ -347,6 +354,24 @@ func HasAutoIncrementColumn(tbInfo *model.TableInfo) (bool, string) {
return false, ""
}

// PolicyByName is used to find the policy.
func (is *infoSchema) PolicyByName(name string) (*placementpolicy.PolicyInfo, bool) {
is.policyMutex.RLock()
defer is.policyMutex.RUnlock()
t, r := is.policyMap[name]
return t, r
}

func (is *infoSchema) PlacementPolicies() []*placementpolicy.PolicyInfo {
is.policyMutex.RLock()
defer is.policyMutex.RUnlock()
policies := make([]*placementpolicy.PolicyInfo, 0, len(is.policyMap))
for _, policy := range is.policyMap {
policies = append(policies, policy)
}
return policies
}

func (is *infoSchema) BundleByName(name string) (*placement.Bundle, bool) {
is.ruleBundleMutex.RLock()
defer is.ruleBundleMutex.RUnlock()
Expand Down
155 changes: 155 additions & 0 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ import (
"github.com/pingcap/tidb/structure"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/logutil"
placement "github.com/pingcap/tidb/util/placementpolicy"
"go.uber.org/zap"
)

var (
globalIDMutex sync.Mutex
policyIDMutex sync.Mutex
)

// Meta structure:
Expand Down Expand Up @@ -70,13 +72,36 @@ var (
mRandomIDPrefix = "TARID"
mBootstrapKey = []byte("BootstrapKey")
mSchemaDiffPrefix = "Diff"
mPolicies = []byte("Policies")
mPolicyPrefix = "Policy"
mPolicyGlobalID = []byte("PolicyGlobalID")
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
mPolicyMagicByte = CurrentMagicByteVer
)

const (
// CurrentMagicByteVer is the current magic byte version, used for future meta compatibility.
CurrentMagicByteVer byte = 0x00
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
// PolicyMagicByte handler
// 0x00 - 0x3F: Json Handler
// 0x40 - 0x7F: Reserved
// 0x80 - 0xBF: Reserved
// 0xC0 - 0xFF: Reserved

// type means how to handle the serialized data.
typeUnknown int = 0
typeJSON int = 1
// todo: customized handler.
)

var (
// ErrDBExists is the error for db exists.
ErrDBExists = dbterror.ClassMeta.NewStd(mysql.ErrDBCreateExists)
// ErrDBNotExists is the error for db not exists.
ErrDBNotExists = dbterror.ClassMeta.NewStd(mysql.ErrBadDB)
// ErrPolicyExists is the error for policy exists.
ErrPolicyExists = dbterror.ClassMeta.NewStd(errno.ErrPlacementPolicyExists)
// ErrPolicyNotExists is the error for policy not exists.
ErrPolicyNotExists = dbterror.ClassMeta.NewStd(errno.ErrPlacementPolicyNotExists)
// ErrTableExists is the error for table exists.
ErrTableExists = dbterror.ClassMeta.NewStd(mysql.ErrTableExists)
// ErrTableNotExists is the error for table not exists.
Expand Down Expand Up @@ -145,6 +170,15 @@ func (m *Meta) GetGlobalID() (int64, error) {
return m.txn.GetInt64(mNextGlobalIDKey)
}

// GetPolicyID gets current policy global id.
func (m *Meta) GetPolicyID() (int64, error) {
return m.txn.GetInt64(mPolicyGlobalID)
}

func (m *Meta) policyKey(policyID int64) []byte {
return []byte(fmt.Sprintf("%s:%d", mPolicyPrefix, policyID))
}

func (m *Meta) dbKey(dbID int64) []byte {
return []byte(fmt.Sprintf("%s:%d", mDBPrefix, dbID))
}
Expand Down Expand Up @@ -268,6 +302,22 @@ func (m *Meta) GenSchemaVersion() (int64, error) {
return m.txn.Inc(mSchemaVersionKey, 1)
}

func (m *Meta) checkPolicyExists(policyKey []byte) error {
v, err := m.txn.HGet(mPolicies, policyKey)
if err == nil && v == nil {
err = ErrPolicyNotExists.GenWithStack("policy doesn't exist")
}
return errors.Trace(err)
}

func (m *Meta) checkPolicyNotExists(policyKey []byte) error {
v, err := m.txn.HGet(mPolicies, policyKey)
if err == nil && v != nil {
err = ErrPolicyExists.GenWithStack("policy already exists")
}
return errors.Trace(err)
}

func (m *Meta) checkDBExists(dbKey []byte) error {
v, err := m.txn.HGet(mDBs, dbKey)
if err == nil && v == nil {
Expand Down Expand Up @@ -300,6 +350,46 @@ func (m *Meta) checkTableNotExists(dbKey []byte, tableKey []byte) error {
return errors.Trace(err)
}

// CreatePolicy creates a policy.
func (m *Meta) CreatePolicy(policy *placement.PolicyInfo) error {
if policy.ID != 0 {
policyKey := m.policyKey(policy.ID)
if err := m.checkPolicyNotExists(policyKey); err != nil {
return errors.Trace(err)
}
} else {
// Autofill the policy ID.
policyIDMutex.Lock()
genID, err := m.txn.Inc(mPolicyGlobalID, 1)
if err != nil {
return errors.Trace(err)
}
policyIDMutex.Unlock()
policy.ID = genID
}
policyKey := m.policyKey(policy.ID)
data, err := json.Marshal(policy)
xhebox marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Trace(err)
}
return m.txn.HSet(mPolicies, policyKey, attachMagicByte(data))
}

// UpdatePolicy updates a policy.
func (m *Meta) UpdatePolicy(policy *placement.PolicyInfo) error {
policyKey := m.policyKey(policy.ID)

if err := m.checkPolicyExists(policyKey); err != nil {
return errors.Trace(err)
}

data, err := json.Marshal(policy)
if err != nil {
return errors.Trace(err)
}
return m.txn.HSet(mPolicies, policyKey, attachMagicByte(data))
}

// CreateDatabase creates a database with db info.
func (m *Meta) CreateDatabase(dbInfo *model.DBInfo) error {
dbKey := m.dbKey(dbInfo.ID)
Expand Down Expand Up @@ -552,6 +642,71 @@ func (m *Meta) GetDatabase(dbID int64) (*model.DBInfo, error) {
return dbInfo, errors.Trace(err)
}

// ListPolicies shows all policies.
func (m *Meta) ListPolicies() ([]*placement.PolicyInfo, error) {
res, err := m.txn.HGetAll(mPolicies)
if err != nil {
return nil, errors.Trace(err)
}

policies := make([]*placement.PolicyInfo, 0, len(res))
for _, r := range res {
value, err := detachMagicByte(r.Value)
if err != nil {
return nil, errors.Trace(err)
}
policy := &placement.PolicyInfo{}
err = json.Unmarshal(value, policy)
if err != nil {
return nil, errors.Trace(err)
}
policies = append(policies, policy)
}
return policies, nil
}

// GetPolicy gets the database value with ID.
func (m *Meta) GetPolicy(policyID int64) (*placement.PolicyInfo, error) {
policyKey := m.policyKey(policyID)
value, err := m.txn.HGet(mPolicies, policyKey)
if err != nil || value == nil {
return nil, errors.Trace(err)
}
value, err = detachMagicByte(value)
if err != nil {
return nil, errors.Trace(err)
}

policy := &placement.PolicyInfo{}
err = json.Unmarshal(value, policy)
return policy, errors.Trace(err)
}

func attachMagicByte(data []byte) []byte {
data = append(data, 0)
copy(data[1:], data)
data[0] = mPolicyMagicByte
return data
}

func detachMagicByte(value []byte) ([]byte, error) {
magic, data := value[:1], value[1:]
switch whichMagicType(magic[0]) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain how should we use the magic byte in the future? Does it means if the byte <= 0x3F they must be compatible each other? For example 0x3F can read the data with byte 0x0 and 0x0 can read the data with byte 0x3F too. If we want to make an incompatible version, the byte must be upgraded to > 0x3F? And it can still be in json format, but ... incompatible.

Copy link
Contributor Author

@AilinKid AilinKid Aug 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This point comes from wanghe. (like magic byte in Java class file)

Appending fields is naturally compatible with old version, so we don't have to change magic byte. For extreme case, modifying a old field will cause misunderstanding between versions, in which we should change the magic byte + 0x01, that told new / old version to handling in a new way / just throw a error, but it should be still in the json handler-range.

In other extreme case, one day we may use msgpack/yaml/other-serialization-format instead, where we should change the magic byte to other reserved corresponding range.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, but for the code:

	case typeJSON:
		return data, nil

It does not make any error only if the magic number < 0x3F. Does it mean new versions must be compatible with old version if their magic numbers are in the same range.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, nice catch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

case typeJSON:
return data, nil
default:
//
return nil, errors.New("unknown magic type handling module")
}
}

func whichMagicType(b byte) int {
if b <= 0x3F {
return typeJSON
}
return typeUnknown
}

// GetTable gets the table value in database with tableID.
func (m *Meta) GetTable(dbID int64, tableID int64) (*model.TableInfo, error) {
// Check if db exists.
Expand Down
Loading