Skip to content

Commit

Permalink
lightning: add pre-restore information getter (#35523)
Browse files Browse the repository at this point in the history
ref #35272
  • Loading branch information
dsdashun authored Jul 14, 2022
1 parent bbc2e65 commit 28d25ac
Show file tree
Hide file tree
Showing 15 changed files with 2,472 additions and 648 deletions.
54 changes: 30 additions & 24 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,26 +125,50 @@ type CheckCtx struct {
DBMetas []*mydump.MDDatabaseMeta
}

// TargetInfoGetter defines the interfaces to get target information.
type TargetInfoGetter interface {
// FetchRemoteTableModels obtains the models of all tables given the schema
// name. The returned table info does not need to be precise if the encoder,
// is not requiring them, but must at least fill in the following fields for
// TablesFromMeta to succeed:
// - Name
// - State (must be model.StatePublic)
// - ID
// - Columns
// * Name
// * State (must be model.StatePublic)
// * Offset (must be 0, 1, 2, ...)
// - PKIsHandle (true = do not generate _tidb_rowid)
FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error)

// CheckRequirements performs the check whether the backend satisfies the version requirements
CheckRequirements(ctx context.Context, checkCtx *CheckCtx) error
}

// EncodingBuilder consists of operations to handle encoding backend row data formats from source.
type EncodingBuilder interface {
// NewEncoder creates an encoder of a TiDB table.
NewEncoder(ctx context.Context, tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error)
// MakeEmptyRows creates an empty collection of encoded rows.
MakeEmptyRows() kv.Rows
}

// AbstractBackend is the abstract interface behind Backend.
// Implementations of this interface must be goroutine safe: you can share an
// instance and execute any method anywhere.
type AbstractBackend interface {
EncodingBuilder
TargetInfoGetter
// Close the connection to the backend.
Close()

// MakeEmptyRows creates an empty collection of encoded rows.
MakeEmptyRows() kv.Rows

// RetryImportDelay returns the duration to sleep when retrying an import
RetryImportDelay() time.Duration

// ShouldPostProcess returns whether KV-specific post-processing should be
// performed for this backend. Post-processing includes checksum and analyze.
ShouldPostProcess() bool

// NewEncoder creates an encoder of a TiDB table.
NewEncoder(ctx context.Context, tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error)

OpenEngine(ctx context.Context, config *EngineConfig, engineUUID uuid.UUID) error

CloseEngine(ctx context.Context, config *EngineConfig, engineUUID uuid.UUID) error
Expand All @@ -156,24 +180,6 @@ type AbstractBackend interface {

CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error

// CheckRequirements performs the check whether the backend satisfies the
// version requirements
CheckRequirements(ctx context.Context, checkCtx *CheckCtx) error

// FetchRemoteTableModels obtains the models of all tables given the schema
// name. The returned table info does not need to be precise if the encoder,
// is not requiring them, but must at least fill in the following fields for
// TablesFromMeta to succeed:
// - Name
// - State (must be model.StatePublic)
// - ID
// - Columns
// * Name
// * State (must be model.StatePublic)
// * Offset (must be 0, 1, 2, ...)
// - PKIsHandle (true = do not generate _tidb_rowid)
FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error)

// FlushEngine ensures all KV pairs written to an open engine has been
// synchronized, such that kill-9'ing Lightning afterwards and resuming from
// checkpoint can recover the exact same content.
Expand Down
227 changes: 142 additions & 85 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,139 @@ type Range struct {
end []byte
}

type encodingBuilder struct {
metrics *metric.Metrics
}

// NewEncodingBuilder creates an KVEncodingBuilder with local backend implementation.
func NewEncodingBuilder(ctx context.Context) backend.EncodingBuilder {
result := new(encodingBuilder)
if m, ok := metric.FromContext(ctx); ok {
result.metrics = m
}
return result
}

// NewEncoder creates a KV encoder.
// It implements the `backend.EncodingBuilder` interface.
func (b *encodingBuilder) NewEncoder(ctx context.Context, tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error) {
return kv.NewTableKVEncoder(tbl, options, b.metrics, log.FromContext(ctx))
}

// MakeEmptyRows creates an empty KV rows.
// It implements the `backend.EncodingBuilder` interface.
func (b *encodingBuilder) MakeEmptyRows() kv.Rows {
return kv.MakeRowsFromKvPairs(nil)
}

type targetInfoGetter struct {
tls *common.TLS
targetDBGlue glue.Glue
pdAddr string
}

// NewTargetInfoGetter creates an TargetInfoGetter with local backend implementation.
func NewTargetInfoGetter(tls *common.TLS, g glue.Glue, pdAddr string) backend.TargetInfoGetter {
return &targetInfoGetter{
tls: tls,
targetDBGlue: g,
pdAddr: pdAddr,
}
}

// FetchRemoteTableModels obtains the models of all tables given the schema name.
// It implements the `TargetInfoGetter` interface.
func (g *targetInfoGetter) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) {
return tikv.FetchRemoteTableModelsFromTLS(ctx, g.tls, schemaName)
}

// CheckRequirements performs the check whether the backend satisfies the version requirements.
// It implements the `TargetInfoGetter` interface.
func (g *targetInfoGetter) CheckRequirements(ctx context.Context, checkCtx *backend.CheckCtx) error {
// TODO: support lightning via SQL
db, _ := g.targetDBGlue.GetDB()
versionStr, err := version.FetchVersion(ctx, db)
if err != nil {
return errors.Trace(err)
}
if err := checkTiDBVersion(ctx, versionStr, localMinTiDBVersion, localMaxTiDBVersion); err != nil {
return err
}
if err := tikv.CheckPDVersion(ctx, g.tls, g.pdAddr, localMinPDVersion, localMaxPDVersion); err != nil {
return err
}
if err := tikv.CheckTiKVVersion(ctx, g.tls, g.pdAddr, localMinTiKVVersion, localMaxTiKVVersion); err != nil {
return err
}

serverInfo := version.ParseServerInfo(versionStr)
return checkTiFlashVersion(ctx, g.targetDBGlue, checkCtx, *serverInfo.ServerVersion)
}

func checkTiDBVersion(_ context.Context, versionStr string, requiredMinVersion, requiredMaxVersion semver.Version) error {
return version.CheckTiDBVersion(versionStr, requiredMinVersion, requiredMaxVersion)
}

var tiFlashReplicaQuery = "SELECT TABLE_SCHEMA, TABLE_NAME FROM information_schema.TIFLASH_REPLICA WHERE REPLICA_COUNT > 0;"

type tblName struct {
schema string
name string
}

type tblNames []tblName

func (t tblNames) String() string {
var b strings.Builder
b.WriteByte('[')
for i, n := range t {
if i > 0 {
b.WriteString(", ")
}
b.WriteString(common.UniqueTable(n.schema, n.name))
}
b.WriteByte(']')
return b.String()
}

// check TiFlash replicas.
// local backend doesn't support TiFlash before tidb v4.0.5
func checkTiFlashVersion(ctx context.Context, g glue.Glue, checkCtx *backend.CheckCtx, tidbVersion semver.Version) error {
if tidbVersion.Compare(tiFlashMinVersion) >= 0 {
return nil
}

res, err := g.GetSQLExecutor().QueryStringsWithLog(ctx, tiFlashReplicaQuery, "fetch tiflash replica info", log.FromContext(ctx))
if err != nil {
return errors.Annotate(err, "fetch tiflash replica info failed")
}

tiFlashTablesMap := make(map[tblName]struct{}, len(res))
for _, tblInfo := range res {
name := tblName{schema: tblInfo[0], name: tblInfo[1]}
tiFlashTablesMap[name] = struct{}{}
}

tiFlashTables := make(tblNames, 0)
for _, dbMeta := range checkCtx.DBMetas {
for _, tblMeta := range dbMeta.Tables {
if len(tblMeta.DataFiles) == 0 {
continue
}
name := tblName{schema: tblMeta.DB, name: tblMeta.Name}
if _, ok := tiFlashTablesMap[name]; ok {
tiFlashTables = append(tiFlashTables, name)
}
}
}

if len(tiFlashTables) > 0 {
helpInfo := "Please either upgrade TiDB to version >= 4.0.5 or add TiFlash replica after load data."
return errors.Errorf("lightning local backend doesn't support TiFlash in this TiDB version. conflict tables: %s. "+helpInfo, tiFlashTables)
}
return nil
}

type local struct {
engines sync.Map // sync version of map[uuid.UUID]*Engine

Expand Down Expand Up @@ -236,6 +369,9 @@ type local struct {
metrics *metric.Metrics
writeLimiter StoreWriteLimiter
logger log.Logger

encBuilder backend.EncodingBuilder
targetInfoGetter backend.TargetInfoGetter
}

func openDuplicateDB(storeDir string) (*pebble.DB, error) {
Expand Down Expand Up @@ -344,6 +480,8 @@ func NewLocalBackend(
bufferPool: membuf.NewPool(membuf.WithAllocator(manual.Allocator{})),
writeLimiter: writeLimiter,
logger: log.FromContext(ctx),
encBuilder: NewEncodingBuilder(ctx),
targetInfoGetter: NewTargetInfoGetter(tls, g, cfg.TiDB.PdAddr),
}
if m, ok := metric.FromContext(ctx); ok {
local.metrics = m
Expand Down Expand Up @@ -1652,100 +1790,19 @@ func (local *local) CleanupEngine(ctx context.Context, engineUUID uuid.UUID) err
}

func (local *local) CheckRequirements(ctx context.Context, checkCtx *backend.CheckCtx) error {
// TODO: support lightning via SQL
db, _ := local.g.GetDB()
versionStr, err := version.FetchVersion(ctx, db)
if err != nil {
return errors.Trace(err)
}
if err := checkTiDBVersion(ctx, versionStr, localMinTiDBVersion, localMaxTiDBVersion); err != nil {
return err
}
if err := tikv.CheckPDVersion(ctx, local.tls, local.pdAddr, localMinPDVersion, localMaxPDVersion); err != nil {
return err
}
if err := tikv.CheckTiKVVersion(ctx, local.tls, local.pdAddr, localMinTiKVVersion, localMaxTiKVVersion); err != nil {
return err
}

serverInfo := version.ParseServerInfo(versionStr)
return checkTiFlashVersion(ctx, local.g, checkCtx, *serverInfo.ServerVersion)
}

func checkTiDBVersion(_ context.Context, versionStr string, requiredMinVersion, requiredMaxVersion semver.Version) error {
return version.CheckTiDBVersion(versionStr, requiredMinVersion, requiredMaxVersion)
}

var tiFlashReplicaQuery = "SELECT TABLE_SCHEMA, TABLE_NAME FROM information_schema.TIFLASH_REPLICA WHERE REPLICA_COUNT > 0;"

type tblName struct {
schema string
name string
}

type tblNames []tblName

func (t tblNames) String() string {
var b strings.Builder
b.WriteByte('[')
for i, n := range t {
if i > 0 {
b.WriteString(", ")
}
b.WriteString(common.UniqueTable(n.schema, n.name))
}
b.WriteByte(']')
return b.String()
}

// check TiFlash replicas.
// local backend doesn't support TiFlash before tidb v4.0.5
func checkTiFlashVersion(ctx context.Context, g glue.Glue, checkCtx *backend.CheckCtx, tidbVersion semver.Version) error {
if tidbVersion.Compare(tiFlashMinVersion) >= 0 {
return nil
}

res, err := g.GetSQLExecutor().QueryStringsWithLog(ctx, tiFlashReplicaQuery, "fetch tiflash replica info", log.FromContext(ctx))
if err != nil {
return errors.Annotate(err, "fetch tiflash replica info failed")
}

tiFlashTablesMap := make(map[tblName]struct{}, len(res))
for _, tblInfo := range res {
name := tblName{schema: tblInfo[0], name: tblInfo[1]}
tiFlashTablesMap[name] = struct{}{}
}

tiFlashTables := make(tblNames, 0)
for _, dbMeta := range checkCtx.DBMetas {
for _, tblMeta := range dbMeta.Tables {
if len(tblMeta.DataFiles) == 0 {
continue
}
name := tblName{schema: tblMeta.DB, name: tblMeta.Name}
if _, ok := tiFlashTablesMap[name]; ok {
tiFlashTables = append(tiFlashTables, name)
}
}
}

if len(tiFlashTables) > 0 {
helpInfo := "Please either upgrade TiDB to version >= 4.0.5 or add TiFlash replica after load data."
return errors.Errorf("lightning local backend doesn't support TiFlash in this TiDB version. conflict tables: %s. "+helpInfo, tiFlashTables)
}
return nil
return local.targetInfoGetter.CheckRequirements(ctx, checkCtx)
}

func (local *local) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) {
return tikv.FetchRemoteTableModelsFromTLS(ctx, local.tls, schemaName)
return local.targetInfoGetter.FetchRemoteTableModels(ctx, schemaName)
}

func (local *local) MakeEmptyRows() kv.Rows {
return kv.MakeRowsFromKvPairs(nil)
return local.encBuilder.MakeEmptyRows()
}

func (local *local) NewEncoder(ctx context.Context, tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error) {
return kv.NewTableKVEncoder(tbl, options, local.metrics, log.FromContext(ctx))
return local.encBuilder.NewEncoder(ctx, tbl, options)
}

func engineSSTDir(storeDir string, engineUUID uuid.UUID) string {
Expand Down
Loading

0 comments on commit 28d25ac

Please sign in to comment.