Skip to content

Commit

Permalink
Preparation for add index acceleration, handle 'import cycle'(pingcap…
Browse files Browse the repository at this point in the history
  • Loading branch information
Benjamin2037 committed Jul 25, 2022
1 parent 5484002 commit 705793d
Show file tree
Hide file tree
Showing 29 changed files with 1,243 additions and 1,034 deletions.
3 changes: 2 additions & 1 deletion br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/conn"
butil "github.com/pingcap/tidb/br/pkg/conn/util"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/metautil"
Expand Down Expand Up @@ -607,7 +608,7 @@ func (bc *Client) BackupRange(
zap.Uint32("concurrency", req.Concurrency))

var allStores []*metapb.Store
allStores, err = conn.GetAllTiKVStoresWithRetry(ctx, bc.mgr.GetPDClient(), conn.SkipTiFlash)
allStores, err = conn.GetAllTiKVStoresWithRetry(ctx, bc.mgr.GetPDClient(), butil.SkipTiFlash)
if err != nil {
return errors.Trace(err)
}
Expand Down
65 changes: 7 additions & 58 deletions br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
logbackup "github.com/pingcap/kvproto/pkg/logbackuppb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/conn/util"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/logutil"
Expand All @@ -27,7 +28,6 @@ import (
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/engine"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/txnlock"
Expand Down Expand Up @@ -68,68 +68,17 @@ type Mgr struct {
*utils.StoreManager
}

// StoreBehavior is the action to do in GetAllTiKVStores when a non-TiKV
// store (e.g. TiFlash store) is found.
type StoreBehavior uint8

const (
// ErrorOnTiFlash causes GetAllTiKVStores to return error when the store is
// found to be a TiFlash node.
ErrorOnTiFlash StoreBehavior = 0
// SkipTiFlash causes GetAllTiKVStores to skip the store when it is found to
// be a TiFlash node.
SkipTiFlash StoreBehavior = 1
// TiFlashOnly caused GetAllTiKVStores to skip the store which is not a
// TiFlash node.
TiFlashOnly StoreBehavior = 2
)

// GetAllTiKVStores returns all TiKV stores registered to the PD client. The
// stores must not be a tombstone and must never contain a label `engine=tiflash`.
func GetAllTiKVStores(
ctx context.Context,
pdClient pd.Client,
storeBehavior StoreBehavior,
) ([]*metapb.Store, error) {
// get all live stores.
stores, err := pdClient.GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return nil, errors.Trace(err)
}

// filter out all stores which are TiFlash.
j := 0
for _, store := range stores {
isTiFlash := false
if engine.IsTiFlash(store) {
if storeBehavior == SkipTiFlash {
continue
} else if storeBehavior == ErrorOnTiFlash {
return nil, errors.Annotatef(berrors.ErrPDInvalidResponse,
"cannot restore to a cluster with active TiFlash stores (store %d at %s)", store.Id, store.Address)
}
isTiFlash = true
}
if !isTiFlash && storeBehavior == TiFlashOnly {
continue
}
stores[j] = store
j++
}
return stores[:j], nil
}

func GetAllTiKVStoresWithRetry(ctx context.Context,
pdClient pd.Client,
storeBehavior StoreBehavior,
storeBehavior util.StoreBehavior,
) ([]*metapb.Store, error) {
stores := make([]*metapb.Store, 0)
var err error

errRetry := utils.WithRetry(
ctx,
func() error {
stores, err = GetAllTiKVStores(ctx, pdClient, storeBehavior)
stores, err = util.GetAllTiKVStores(ctx, pdClient, storeBehavior)
failpoint.Inject("hint-GetAllTiKVStores-error", func(val failpoint.Value) {
if val.(bool) {
logutil.CL(ctx).Debug("failpoint hint-GetAllTiKVStores-error injected.")
Expand All @@ -154,9 +103,9 @@ func GetAllTiKVStoresWithRetry(ctx context.Context,

func checkStoresAlive(ctx context.Context,
pdclient pd.Client,
storeBehavior StoreBehavior) error {
storeBehavior util.StoreBehavior) error {
// Check live tikv.
stores, err := GetAllTiKVStores(ctx, pdclient, storeBehavior)
stores, err := util.GetAllTiKVStores(ctx, pdclient, storeBehavior)
if err != nil {
log.Error("fail to get store", zap.Error(err))
return errors.Trace(err)
Expand Down Expand Up @@ -184,7 +133,7 @@ func NewMgr(
tlsConf *tls.Config,
securityOption pd.SecurityOption,
keepalive keepalive.ClientParameters,
storeBehavior StoreBehavior,
storeBehavior util.StoreBehavior,
checkRequirements bool,
needDomain bool,
versionCheckerType VersionCheckerType,
Expand Down Expand Up @@ -368,7 +317,7 @@ func (mgr *Mgr) GetMergeRegionSizeAndCount(ctx context.Context, client *http.Cli

// GetConfigFromTiKV get configs from all alive tikv stores.
func (mgr *Mgr) GetConfigFromTiKV(ctx context.Context, cli *http.Client, fn func(*http.Response) error) error {
allStores, err := GetAllTiKVStoresWithRetry(ctx, mgr.GetPDClient(), SkipTiFlash)
allStores, err := GetAllTiKVStoresWithRetry(ctx, mgr.GetPDClient(), util.SkipTiFlash)
if err != nil {
return errors.Trace(err)
}
Expand Down
27 changes: 14 additions & 13 deletions br/pkg/conn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/br/pkg/conn/util"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -56,7 +57,7 @@ func TestGetAllTiKVStoresWithRetryCancel(t *testing.T) {
Stores: stores,
}

_, err := GetAllTiKVStoresWithRetry(ctx, fpdc, SkipTiFlash)
_, err := GetAllTiKVStoresWithRetry(ctx, fpdc, util.SkipTiFlash)
require.Error(t, err)
require.Equal(t, codes.Canceled, status.Code(errors.Cause(err)))
}
Expand Down Expand Up @@ -96,7 +97,7 @@ func TestGetAllTiKVStoresWithUnknown(t *testing.T) {
Stores: stores,
}

_, err := GetAllTiKVStoresWithRetry(ctx, fpdc, SkipTiFlash)
_, err := GetAllTiKVStoresWithRetry(ctx, fpdc, util.SkipTiFlash)
require.Error(t, err)
require.Equal(t, codes.Unknown, status.Code(errors.Cause(err)))
}
Expand Down Expand Up @@ -151,50 +152,50 @@ func TestCheckStoresAlive(t *testing.T) {
Stores: stores,
}

kvStores, err := GetAllTiKVStoresWithRetry(ctx, fpdc, SkipTiFlash)
kvStores, err := GetAllTiKVStoresWithRetry(ctx, fpdc, util.SkipTiFlash)
require.NoError(t, err)
require.Len(t, kvStores, 2)
require.Equal(t, stores[2:], kvStores)

err = checkStoresAlive(ctx, fpdc, SkipTiFlash)
err = checkStoresAlive(ctx, fpdc, util.SkipTiFlash)
require.NoError(t, err)
}

func TestGetAllTiKVStores(t *testing.T) {
testCases := []struct {
stores []*metapb.Store
storeBehavior StoreBehavior
storeBehavior util.StoreBehavior
expectedStores map[uint64]int
expectedError string
}{
{
stores: []*metapb.Store{
{Id: 1},
},
storeBehavior: SkipTiFlash,
storeBehavior: util.SkipTiFlash,
expectedStores: map[uint64]int{1: 1},
},
{
stores: []*metapb.Store{
{Id: 1},
},
storeBehavior: ErrorOnTiFlash,
storeBehavior: util.ErrorOnTiFlash,
expectedStores: map[uint64]int{1: 1},
},
{
stores: []*metapb.Store{
{Id: 1},
{Id: 2, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}}},
},
storeBehavior: SkipTiFlash,
storeBehavior: util.SkipTiFlash,
expectedStores: map[uint64]int{1: 1},
},
{
stores: []*metapb.Store{
{Id: 1},
{Id: 2, Labels: []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}}},
},
storeBehavior: ErrorOnTiFlash,
storeBehavior: util.ErrorOnTiFlash,
expectedError: "^cannot restore to a cluster with active TiFlash stores",
},
{
Expand All @@ -206,7 +207,7 @@ func TestGetAllTiKVStores(t *testing.T) {
{Id: 5, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tikv"}, {Key: "engine", Value: "tiflash"}}},
{Id: 6, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tiflash"}, {Key: "engine", Value: "tikv"}}},
},
storeBehavior: SkipTiFlash,
storeBehavior: util.SkipTiFlash,
expectedStores: map[uint64]int{1: 1, 3: 1, 4: 1, 6: 1},
},
{
Expand All @@ -218,7 +219,7 @@ func TestGetAllTiKVStores(t *testing.T) {
{Id: 5, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tikv"}, {Key: "engine", Value: "tiflash"}}},
{Id: 6, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tiflash"}, {Key: "engine", Value: "tikv"}}},
},
storeBehavior: ErrorOnTiFlash,
storeBehavior: util.ErrorOnTiFlash,
expectedError: "^cannot restore to a cluster with active TiFlash stores",
},
{
Expand All @@ -230,14 +231,14 @@ func TestGetAllTiKVStores(t *testing.T) {
{Id: 5, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tikv"}, {Key: "engine", Value: "tiflash"}}},
{Id: 6, Labels: []*metapb.StoreLabel{{Key: "else", Value: "tiflash"}, {Key: "engine", Value: "tikv"}}},
},
storeBehavior: TiFlashOnly,
storeBehavior: util.TiFlashOnly,
expectedStores: map[uint64]int{2: 1, 5: 1},
},
}

for _, testCase := range testCases {
pdClient := utils.FakePDClient{Stores: testCase.stores}
stores, err := GetAllTiKVStores(context.Background(), pdClient, testCase.storeBehavior)
stores, err := util.GetAllTiKVStores(context.Background(), pdClient, testCase.storeBehavior)
if len(testCase.expectedError) != 0 {
require.Error(t, err)
require.Regexp(t, testCase.expectedError, err.Error())
Expand Down
62 changes: 62 additions & 0 deletions br/pkg/conn/util/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package util

import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/util/engine"
pd "github.com/tikv/pd/client"
)

// StoreBehavior is the action to do in GetAllTiKVStores when a non-TiKV
// store (e.g. TiFlash store) is found.
type StoreBehavior uint8

const (
// ErrorOnTiFlash causes GetAllTiKVStores to return error when the store is
// found to be a TiFlash node.
ErrorOnTiFlash StoreBehavior = 0
// SkipTiFlash causes GetAllTiKVStores to skip the store when it is found to
// be a TiFlash node.
SkipTiFlash StoreBehavior = 1
// TiFlashOnly caused GetAllTiKVStores to skip the store which is not a
// TiFlash node.
TiFlashOnly StoreBehavior = 2
)

// GetAllTiKVStores returns all TiKV stores registered to the PD client. The
// stores must not be a tombstone and must never contain a label `engine=tiflash`.
func GetAllTiKVStores(
ctx context.Context,
pdClient pd.Client,
storeBehavior StoreBehavior,
) ([]*metapb.Store, error) {
// get all live stores.
stores, err := pdClient.GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return nil, errors.Trace(err)
}

// filter out all stores which are TiFlash.
j := 0
for _, store := range stores {
isTiFlash := false
if engine.IsTiFlash(store) {
if storeBehavior == SkipTiFlash {
continue
} else if storeBehavior == ErrorOnTiFlash {
return nil, errors.Annotatef(berrors.ErrPDInvalidResponse,
"cannot restore to a cluster with active TiFlash stores (store %d at %s)", store.Id, store.Address)
}
isTiFlash = true
}
if !isTiFlash && storeBehavior == TiFlashOnly {
continue
}
stores[j] = store
j++
}
return stores[:j], nil
}
5 changes: 3 additions & 2 deletions br/pkg/lightning/backend/kv/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package kv
package kv_test

import (
"testing"

"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/parser/mysql"
"github.com/stretchr/testify/require"
)

func TestSession(t *testing.T) {
session := newSession(&SessionOptions{SQLMode: mysql.ModeNone, Timestamp: 1234567890}, log.L())
session := kv.NewSession(&kv.SessionOptions{SQLMode: mysql.ModeNone, Timestamp: 1234567890}, log.L())
_, err := session.Txn(true)
require.NoError(t, err)
}
30 changes: 28 additions & 2 deletions br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ import (
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql" //nolint: goimports
// Import tidb/planner/core to initialize expression.RewriteAstExpr
_ "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
Expand Down Expand Up @@ -65,6 +64,10 @@ type tableKVEncoder struct {
metrics *metric.Metrics
}

func GetSession4test(encoder Encoder) sessionctx.Context {
return encoder.(*tableKVEncoder).se
}

func NewTableKVEncoder(
tbl table.Table,
options *SessionOptions,
Expand Down Expand Up @@ -322,6 +325,14 @@ func KvPairsFromRows(rows Rows) []common.KvPair {
return rows.(*KvPairs).pairs
}

// KvPairsFromRow converts a Rows instance constructed from MakeRowsFromKvPairs
// back into a slice of KvPair. This method panics if the Rows is not
// constructed in such way.
// nolint:golint // kv.KvPairsFromRow sounds good.
func KvPairsFromRow(rows Row) []common.KvPair {
return rows.(*KvPairs).pairs
}

func evaluateGeneratedColumns(se *session, record []types.Datum, cols []*table.Column, genCols []genCol) (err error, errCol *model.ColumnInfo) {
mutRow := chunk.MutRowFromDatums(record)
for _, gc := range genCols {
Expand Down Expand Up @@ -449,6 +460,21 @@ func isPKCol(colInfo *model.ColumnInfo) bool {
return mysql.HasPriKeyFlag(colInfo.GetFlag())
}

// GetEncoderAutoIDFn return Auto increment id.
func GetEncoderAutoIDFn(encoder Encoder, id int64) int64 {
return encoder.(*tableKVEncoder).autoIDFn(id)
}

// GetEncoderSe return session.
func GetEncoderSe(encoder Encoder) *session {
return encoder.(*tableKVEncoder).se
}

// GetActualDatum export getActualDatum function.
func GetActualDatum(encoder Encoder, rowID int64, colIndex int, inputDatum *types.Datum) (types.Datum, error) {
return encoder.(*tableKVEncoder).getActualDatum(70, 0, inputDatum)
}

func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDatum *types.Datum) (types.Datum, error) {
var (
value types.Datum
Expand Down
Loading

0 comments on commit 705793d

Please sign in to comment.