Skip to content

Commit

Permalink
infoschema: add a simply store for DM's SchemaTracker (#35954)
Browse files Browse the repository at this point in the history
ref #35933
  • Loading branch information
lance6716 authored Jul 6, 2022
1 parent 07f1242 commit 6531bd1
Show file tree
Hide file tree
Showing 2 changed files with 275 additions and 0 deletions.
160 changes: 160 additions & 0 deletions infoschema/info_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package infoschema

import (
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
)

// InfoStore is a simple structure that stores DBInfo and TableInfo. It's modifiable and not thread-safe.
type InfoStore struct {
lowerCaseTableNames int // same as variable lower_case_table_names

dbs map[string]*model.DBInfo
tables map[string]map[string]*model.TableInfo
}

// NewInfoStore creates a InfoStore.
func NewInfoStore(lowerCaseTableNames int) *InfoStore {
return &InfoStore{
lowerCaseTableNames: lowerCaseTableNames,
dbs: map[string]*model.DBInfo{},
tables: map[string]map[string]*model.TableInfo{},
}
}

func (i *InfoStore) ciStr2Key(name model.CIStr) string {
if i.lowerCaseTableNames == 0 {
return name.O
}
return name.L
}

// SchemaByName returns the DBInfo of given name. nil if not found.
func (i *InfoStore) SchemaByName(name model.CIStr) *model.DBInfo {
key := i.ciStr2Key(name)
return i.dbs[key]
}

// PutSchema puts a DBInfo, it will overwrite the old one.
func (i *InfoStore) PutSchema(dbInfo *model.DBInfo) {
key := i.ciStr2Key(dbInfo.Name)
i.dbs[key] = dbInfo
if i.tables[key] == nil {
i.tables[key] = map[string]*model.TableInfo{}
}
}

// DeleteSchema deletes the schema from InfoSchema. Returns true when the schema exists, false otherwise.
func (i *InfoStore) DeleteSchema(name model.CIStr) bool {
key := i.ciStr2Key(name)
_, ok := i.dbs[key]
if !ok {
return false
}
delete(i.dbs, key)
delete(i.tables, key)
return true
}

// TableByName returns the TableInfo. It will also return the error like an infoschema.
func (i *InfoStore) TableByName(schema, table model.CIStr) (*model.TableInfo, error) {
schemaKey := i.ciStr2Key(schema)
tables, ok := i.tables[schemaKey]
if !ok {
return nil, ErrDatabaseNotExists.GenWithStackByArgs(schema)
}

tableKey := i.ciStr2Key(table)
tbl, ok := tables[tableKey]
if !ok {
return nil, ErrTableNotExists.GenWithStackByArgs(schema, table)
}
return tbl, nil
}

// PutTable puts a TableInfo, it will overwrite the old one. If the schema doesn't exist, it will return ErrDatabaseNotExists.
func (i *InfoStore) PutTable(schemaName model.CIStr, tblInfo *model.TableInfo) error {
schemaKey := i.ciStr2Key(schemaName)
tables, ok := i.tables[schemaKey]
if !ok {
return ErrDatabaseNotExists.GenWithStackByArgs(schemaName)
}
tableKey := i.ciStr2Key(tblInfo.Name)
tables[tableKey] = tblInfo
return nil
}

// DeleteTable deletes the TableInfo, it will return ErrDatabaseNotExists or ErrTableNotExists when schema or table does
// not exist.
func (i *InfoStore) DeleteTable(schema, table model.CIStr) error {
schemaKey := i.ciStr2Key(schema)
tables, ok := i.tables[schemaKey]
if !ok {
return ErrDatabaseNotExists.GenWithStackByArgs(schema)
}

tableKey := i.ciStr2Key(table)
_, ok = tables[tableKey]
if !ok {
return ErrTableNotExists.GenWithStackByArgs(schema, table)
}
delete(tables, tableKey)
return nil
}

// InfoStoreAdaptor convert InfoStore to InfoSchema, it only implements a part of InfoSchema interface to be
// used by DDL interface.
// nolint:unused
type InfoStoreAdaptor struct {
InfoSchema
inner *InfoStore
}

// SchemaByName implements the InfoSchema interface.
// nolint:unused
func (i InfoStoreAdaptor) SchemaByName(schema model.CIStr) (*model.DBInfo, bool) {
dbInfo := i.inner.SchemaByName(schema)
return dbInfo, dbInfo != nil
}

// TableExists implements the InfoSchema interface.
// nolint:unused
func (i InfoStoreAdaptor) TableExists(schema, table model.CIStr) bool {
tableInfo, _ := i.inner.TableByName(schema, table)
return tableInfo != nil
}

// TableIsView implements the InfoSchema interface.
// nolint:unused
func (i InfoStoreAdaptor) TableIsView(schema, table model.CIStr) bool {
tableInfo, _ := i.inner.TableByName(schema, table)
if tableInfo == nil {
return false
}
return tableInfo.IsView()
}

// TableByName implements the InfoSchema interface.
// nolint:unused
func (i InfoStoreAdaptor) TableByName(schema, table model.CIStr) (t table.Table, err error) {
tableInfo, err := i.inner.TableByName(schema, table)
if err != nil {
return nil, err
}
return tables.MockTableFromMeta(tableInfo), nil
}
115 changes: 115 additions & 0 deletions infoschema/info_store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package infoschema

import (
"testing"

"github.com/pingcap/tidb/parser/model"
"github.com/stretchr/testify/require"
)

func TestInfoStoreLowerCaseTableNames(t *testing.T) {
dbName := model.NewCIStr("DBName")
lowerDBName := model.NewCIStr("dbname")
tableName := model.NewCIStr("TableName")
lowerTableName := model.NewCIStr("tablename")
dbInfo := &model.DBInfo{Name: dbName}
tableInfo := &model.TableInfo{Name: tableName}

// case-sensitive

is := NewInfoStore(0)
is.PutSchema(dbInfo)
got := is.SchemaByName(dbName)
require.NotNil(t, got)
got = is.SchemaByName(lowerDBName)
require.Nil(t, got)

err := is.PutTable(lowerDBName, tableInfo)
require.True(t, ErrDatabaseNotExists.Equal(err))
err = is.PutTable(dbName, tableInfo)
require.NoError(t, err)
got2, err := is.TableByName(dbName, tableName)
require.NoError(t, err)
require.NotNil(t, got2)
got2, err = is.TableByName(lowerTableName, tableName)
require.True(t, ErrDatabaseNotExists.Equal(err))
require.Nil(t, got2)
got2, err = is.TableByName(dbName, lowerTableName)
require.True(t, ErrTableNotExists.Equal(err))
require.Nil(t, got2)

// compare-insensitive

is = NewInfoStore(2)
is.PutSchema(dbInfo)
got = is.SchemaByName(dbName)
require.NotNil(t, got)
got = is.SchemaByName(lowerDBName)
require.NotNil(t, got)
require.Equal(t, dbName, got.Name)

err = is.PutTable(lowerDBName, tableInfo)
require.NoError(t, err)
got2, err = is.TableByName(dbName, tableName)
require.NoError(t, err)
require.NotNil(t, got2)
got2, err = is.TableByName(dbName, lowerTableName)
require.NoError(t, err)
require.NotNil(t, got2)
require.Equal(t, tableName, got2.Name)
}

func TestInfoStoreDeleteTables(t *testing.T) {
is := NewInfoStore(0)
dbName1 := model.NewCIStr("DBName1")
dbName2 := model.NewCIStr("DBName2")
tableName1 := model.NewCIStr("TableName1")
tableName2 := model.NewCIStr("TableName2")
dbInfo1 := &model.DBInfo{Name: dbName1}
dbInfo2 := &model.DBInfo{Name: dbName2}
tableInfo1 := &model.TableInfo{Name: tableName1}
tableInfo2 := &model.TableInfo{Name: tableName2}

is.PutSchema(dbInfo1)
err := is.PutTable(dbName1, tableInfo1)
require.NoError(t, err)
err = is.PutTable(dbName1, tableInfo2)
require.NoError(t, err)

// db2 not created
ok := is.DeleteSchema(dbName2)
require.False(t, ok)
err = is.PutTable(dbName2, tableInfo1)
require.True(t, ErrDatabaseNotExists.Equal(err))
err = is.DeleteTable(dbName2, tableName1)
require.True(t, ErrDatabaseNotExists.Equal(err))

is.PutSchema(dbInfo2)
err = is.PutTable(dbName2, tableInfo1)
require.NoError(t, err)

err = is.DeleteTable(dbName2, tableName2)
require.True(t, ErrTableNotExists.Equal(err))
err = is.DeleteTable(dbName2, tableName1)
require.NoError(t, err)

// delete db will remove its tables
ok = is.DeleteSchema(dbName1)
require.True(t, ok)
_, err = is.TableByName(dbName1, tableName1)
require.True(t, ErrDatabaseNotExists.Equal(err))
}

0 comments on commit 6531bd1

Please sign in to comment.