Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Some SwitchMode improvements #287

Merged
merged 2 commits into from
Apr 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion cmd/tidb-lightning-ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func main() {

func run() error {
var (
compact *bool
compact, flagFetchMode *bool
mode, flagImportEngine, flagCleanupEngine *string
cpRemove, cpErrIgnore, cpErrDestroy, cpDump *string

Expand All @@ -51,6 +51,7 @@ func run() error {
globalCfg := config.Must(config.LoadGlobalConfig(os.Args[1:], func(fs *flag.FlagSet) {
compact = fs.Bool("compact", false, "do manual compaction on the target cluster")
mode = fs.String("switch-mode", "", "switch tikv into import mode or normal mode, values can be ['import', 'normal']")
flagFetchMode = fs.Bool("fetch-mode", false, "obtain the current mode of every tikv in the cluster")

flagImportEngine = fs.String("import-engine", "", "manually import a closed engine (value can be '`db`.`table`:123' or a UUID")
flagCleanupEngine = fs.String("cleanup-engine", "", "manually delete a closed engine")
Expand Down Expand Up @@ -84,6 +85,9 @@ func run() error {
if *compact {
return errors.Trace(compactCluster(ctx, cfg, tls))
}
if *flagFetchMode {
return errors.Trace(fetchMode(ctx, cfg, tls))
}
if len(*mode) != 0 {
return errors.Trace(switchMode(ctx, cfg, tls, *mode))
}
Expand Down Expand Up @@ -143,6 +147,23 @@ func switchMode(ctx context.Context, cfg *config.Config, tls *common.TLS, mode s
)
}

func fetchMode(ctx context.Context, cfg *config.Config, tls *common.TLS) error {
return kv.ForAllStores(
ctx,
tls.WithHost(cfg.TiDB.PdAddr),
kv.StoreStateDisconnected,
func(c context.Context, store *kv.Store) error {
mode, err := kv.FetchMode(c, tls, store.Address)
if err != nil {
fmt.Fprintf(os.Stderr, "%-30s | Error: %v\n", store.Address, err)
} else {
fmt.Fprintf(os.Stderr, "%-30s | %s mode\n", store.Address, mode)
}
return nil
},
)
}

func checkpointRemove(ctx context.Context, cfg *config.Config, tableName string) error {
cpdb, err := restore.OpenCheckpointsDB(ctx, cfg)
if err != nil {
Expand Down
51 changes: 47 additions & 4 deletions lightning/backend/tikv.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@ package backend

import (
"context"
"regexp"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/debugpb"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/tidb-lightning/lightning/common"
"github.com/pingcap/tidb-lightning/lightning/log"
Expand Down Expand Up @@ -124,28 +128,67 @@ func ForAllStores(
return eg.Wait()
}

func ignoreUnimplementedError(err error, logger log.Logger) error {
if status.Code(err) == codes.Unimplemented {
logger.Debug("skipping potentially TiFlash store")
return nil
}
return errors.Trace(err)
}

// SwitchMode changes the TiKV node at the given address to a particular mode.
func SwitchMode(ctx context.Context, tls *common.TLS, tikvAddr string, mode import_sstpb.SwitchMode) error {
task := log.With(zap.Stringer("mode", mode)).Begin(zap.DebugLevel, "switch mode")
task := log.With(zap.Stringer("mode", mode), zap.String("tikv", tikvAddr)).Begin(zap.DebugLevel, "switch mode")
err := withTiKVConnection(ctx, tls, tikvAddr, func(client import_sstpb.ImportSSTClient) error {
_, err := client.SwitchMode(ctx, &import_sstpb.SwitchModeRequest{
Mode: mode,
})
return errors.Trace(err)
return ignoreUnimplementedError(err, task.Logger)
})
task.End(zap.WarnLevel, err)
return err
}

// Compact performs a leveled compaction with the given minimum level.
func Compact(ctx context.Context, tls *common.TLS, tikvAddr string, level int32) error {
task := log.With(zap.Int32("level", level)).Begin(zap.InfoLevel, "compact cluster")
task := log.With(zap.Int32("level", level), zap.String("tikv", tikvAddr)).Begin(zap.InfoLevel, "compact cluster")
err := withTiKVConnection(ctx, tls, tikvAddr, func(client import_sstpb.ImportSSTClient) error {
_, err := client.Compact(ctx, &import_sstpb.CompactRequest{
OutputLevel: level,
})
return errors.Trace(err)
return ignoreUnimplementedError(err, task.Logger)
})
task.End(zap.ErrorLevel, err)
return err
}

var fetchModeRegexp = regexp.MustCompile(`\btikv_config_rocksdb\{cf="default",name="hard_pending_compaction_bytes_limit"\} ([^\n]+)`)

// FetchMode obtains the import mode status of the TiKV node.
func FetchMode(ctx context.Context, tls *common.TLS, tikvAddr string) (import_sstpb.SwitchMode, error) {
conn, err := grpc.DialContext(ctx, tikvAddr, tls.ToGRPCDialOption())
if err != nil {
return 0, err
}
defer conn.Close()

client := debugpb.NewDebugClient(conn)
resp, err := client.GetMetrics(ctx, &debugpb.GetMetricsRequest{All: false})
if err != nil {
return 0, errors.Trace(err)
}
return FetchModeFromMetrics(resp.Prometheus)
}

// FetchMode obtains the import mode status from the Prometheus metrics of a TiKV node.
func FetchModeFromMetrics(metrics string) (import_sstpb.SwitchMode, error) {
m := fetchModeRegexp.FindStringSubmatch(metrics)
switch {
case len(m) < 2:
return 0, errors.New("import mode status is not exposed")
case m[1] == "0":
return import_sstpb.SwitchMode_Import, nil
default:
return import_sstpb.SwitchMode_Normal, nil
}
}
33 changes: 33 additions & 0 deletions lightning/backend/tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/import_sstpb"

kv "github.com/pingcap/tidb-lightning/lightning/backend"
"github.com/pingcap/tidb-lightning/lightning/common"
Expand Down Expand Up @@ -113,3 +114,35 @@ func (s *tikvSuite) TestForAllStores(c *C) {
},
})
}

func (s *tikvSuite) TestFetchModeFromMetrics(c *C) {
testCases := []struct {
metrics string
mode import_sstpb.SwitchMode
isErr bool
}{
{
metrics: `tikv_config_rocksdb{cf="default",name="hard_pending_compaction_bytes_limit"} 274877906944`,
mode: import_sstpb.SwitchMode_Normal,
},
{
metrics: `tikv_config_rocksdb{cf="default",name="hard_pending_compaction_bytes_limit"} 0`,
mode: import_sstpb.SwitchMode_Import,
},
{
metrics: ``,
isErr: true,
},
}

for _, tc := range testCases {
comment := Commentf("test case '%s'", tc.metrics)
mode, err := kv.FetchModeFromMetrics(tc.metrics)
if tc.isErr {
c.Assert(err, NotNil, comment)
} else {
c.Assert(err, IsNil, comment)
c.Assert(mode, Equals, tc.mode, comment)
}
}
}