Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Some SwitchMode improvements #287

Merged
merged 2 commits into from
Apr 13, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
tidb-lightning-ctl: added --fetch-mode subcommand
kennytm committed Mar 23, 2020

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit b63c45a2bd454f2d01882610ae4930f1a10c0a65
23 changes: 22 additions & 1 deletion cmd/tidb-lightning-ctl/main.go
Original file line number Diff line number Diff line change
@@ -41,7 +41,7 @@ func main() {

func run() error {
var (
compact *bool
compact, flagFetchMode *bool
mode, flagImportEngine, flagCleanupEngine *string
cpRemove, cpErrIgnore, cpErrDestroy, cpDump *string

@@ -51,6 +51,7 @@ func run() error {
globalCfg := config.Must(config.LoadGlobalConfig(os.Args[1:], func(fs *flag.FlagSet) {
compact = fs.Bool("compact", false, "do manual compaction on the target cluster")
mode = fs.String("switch-mode", "", "switch tikv into import mode or normal mode, values can be ['import', 'normal']")
flagFetchMode = fs.Bool("fetch-mode", false, "obtain the current mode of every tikv in the cluster")

flagImportEngine = fs.String("import-engine", "", "manually import a closed engine (value can be '`db`.`table`:123' or a UUID")
flagCleanupEngine = fs.String("cleanup-engine", "", "manually delete a closed engine")
@@ -84,6 +85,9 @@ func run() error {
if *compact {
return errors.Trace(compactCluster(ctx, cfg, tls))
}
if *flagFetchMode {
return errors.Trace(fetchMode(ctx, cfg, tls))
}
if len(*mode) != 0 {
return errors.Trace(switchMode(ctx, cfg, tls, *mode))
}
@@ -143,6 +147,23 @@ func switchMode(ctx context.Context, cfg *config.Config, tls *common.TLS, mode s
)
}

func fetchMode(ctx context.Context, cfg *config.Config, tls *common.TLS) error {
return kv.ForAllStores(
ctx,
tls.WithHost(cfg.TiDB.PdAddr),
kv.StoreStateDisconnected,
func(c context.Context, store *kv.Store) error {
mode, err := kv.FetchMode(c, tls, store.Address)
if err != nil {
fmt.Fprintf(os.Stderr, "%-30s | Error: %v\n", store.Address, err)
} else {
fmt.Fprintf(os.Stderr, "%-30s | %s mode\n", store.Address, mode)
}
return nil
},
)
}

func checkpointRemove(ctx context.Context, cfg *config.Config, tableName string) error {
cpdb, err := restore.OpenCheckpointsDB(ctx, cfg)
if err != nil {
33 changes: 33 additions & 0 deletions lightning/backend/tikv.go
Original file line number Diff line number Diff line change
@@ -15,8 +15,10 @@ package backend

import (
"context"
"regexp"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/debugpb"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
@@ -159,3 +161,34 @@ func Compact(ctx context.Context, tls *common.TLS, tikvAddr string, level int32)
task.End(zap.ErrorLevel, err)
return err
}

var fetchModeRegexp = regexp.MustCompile(`\btikv_config_rocksdb\{cf="default",name="hard_pending_compaction_bytes_limit"\} ([^\n]+)`)

// FetchMode obtains the import mode status of the TiKV node.
func FetchMode(ctx context.Context, tls *common.TLS, tikvAddr string) (import_sstpb.SwitchMode, error) {
conn, err := grpc.DialContext(ctx, tikvAddr, tls.ToGRPCDialOption())
if err != nil {
return 0, err
}
defer conn.Close()

client := debugpb.NewDebugClient(conn)
resp, err := client.GetMetrics(ctx, &debugpb.GetMetricsRequest{All: false})
if err != nil {
return 0, errors.Trace(err)
}
return FetchModeFromMetrics(resp.Prometheus)
}

// FetchMode obtains the import mode status from the Prometheus metrics of a TiKV node.
func FetchModeFromMetrics(metrics string) (import_sstpb.SwitchMode, error) {
m := fetchModeRegexp.FindStringSubmatch(metrics)
switch {
case len(m) < 2:
return 0, errors.New("import mode status is not exposed")
case m[1] == "0":
return import_sstpb.SwitchMode_Import, nil
default:
return import_sstpb.SwitchMode_Normal, nil
}
}
33 changes: 33 additions & 0 deletions lightning/backend/tikv_test.go
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ import (
"sync"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/import_sstpb"

kv "github.com/pingcap/tidb-lightning/lightning/backend"
"github.com/pingcap/tidb-lightning/lightning/common"
@@ -113,3 +114,35 @@ func (s *tikvSuite) TestForAllStores(c *C) {
},
})
}

func (s *tikvSuite) TestFetchModeFromMetrics(c *C) {
testCases := []struct {
metrics string
mode import_sstpb.SwitchMode
isErr bool
}{
{
metrics: `tikv_config_rocksdb{cf="default",name="hard_pending_compaction_bytes_limit"} 274877906944`,
mode: import_sstpb.SwitchMode_Normal,
},
{
metrics: `tikv_config_rocksdb{cf="default",name="hard_pending_compaction_bytes_limit"} 0`,
mode: import_sstpb.SwitchMode_Import,
},
{
metrics: ``,
isErr: true,
},
}

for _, tc := range testCases {
comment := Commentf("test case '%s'", tc.metrics)
mode, err := kv.FetchModeFromMetrics(tc.metrics)
if tc.isErr {
c.Assert(err, NotNil, comment)
} else {
c.Assert(err, IsNil, comment)
c.Assert(mode, Equals, tc.mode, comment)
}
}
}