Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: show better placement schedule state #31073

Merged
merged 10 commits into from
Jan 9, 2022
25 changes: 0 additions & 25 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package infosync

import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -425,30 +424,6 @@ func doRequestWithFailpoint(req *http.Request) (resp *http.Response, err error)
return util2.InternalHTTPClient().Do(req)
}

// GetReplicationState is used to check if regions in the given keyranges are replicated from PD.
func GetReplicationState(ctx context.Context, startKey []byte, endKey []byte) (bool, error) {
is, err := getGlobalInfoSyncer()
if err != nil {
return false, err
}

if is.etcdCli == nil {
return false, nil
}

addrs := is.etcdCli.Endpoints()

if len(addrs) == 0 {
return false, errors.Errorf("pd unavailable")
}

res, err := doRequest(ctx, addrs, fmt.Sprintf("%s/replicated?startKey=%s&endKey=%s", pdapi.Regions, hex.EncodeToString(startKey), hex.EncodeToString(endKey)), "GET", nil)
if err == nil && res != nil {
return string(res) == "true\n", nil
}
return false, err
}

// GetAllRuleBundles is used to get all rule bundles from PD. It is used to load full rules from PD while fullload infoschema.
func GetAllRuleBundles(ctx context.Context) ([]*placement.Bundle, error) {
is, err := getGlobalInfoSyncer()
Expand Down
86 changes: 86 additions & 0 deletions domain/infosync/region.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package infosync

import (
"context"
"encoding/hex"
"encoding/json"
"fmt"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/util/pdapi"
)

// PlacementScheduleState is the returned third-valued state from GetReplicationState(). For convenience, the string of PD is deserialized into an enum first.
type PlacementScheduleState int

const (
// PlacementScheduleStatePending corresponds to "PENDING" from PD.
PlacementScheduleStatePending PlacementScheduleState = iota
// PlacementScheduleStateInProgress corresponds to "INPROGRESS" from PD.
PlacementScheduleStateInProgress
// PlacementScheduleStateScheduled corresponds to "REPLICATED" from PD.
PlacementScheduleStateScheduled
)

func (t PlacementScheduleState) String() string {
switch t {
case PlacementScheduleStateScheduled:
return "SCHEDULED"
case PlacementScheduleStateInProgress:
return "INPROGRESS"
case PlacementScheduleStatePending:
return "PENDING"
default:
return "PENDING"
}
}

// GetReplicationState is used to check if regions in the given keyranges are replicated from PD.
func GetReplicationState(ctx context.Context, startKey []byte, endKey []byte) (PlacementScheduleState, error) {
is, err := getGlobalInfoSyncer()
if err != nil {
return PlacementScheduleStatePending, err
}

if is.etcdCli == nil {
return PlacementScheduleStatePending, nil
}

addrs := is.etcdCli.Endpoints()

if len(addrs) == 0 {
return PlacementScheduleStatePending, errors.Errorf("pd unavailable")
}

res, err := doRequest(ctx, addrs, fmt.Sprintf("%s/replicated?startKey=%s&endKey=%s", pdapi.Regions, hex.EncodeToString(startKey), hex.EncodeToString(endKey)), "GET", nil)
if err == nil && res != nil {
st := PlacementScheduleStatePending
// it should not fail
var state string
_ = json.Unmarshal(res, &state)
hawkingrei marked this conversation as resolved.
Show resolved Hide resolved
switch state {
case "REPLICATED":
st = PlacementScheduleStateScheduled
case "INPROGRESS":
st = PlacementScheduleStateInProgress
case "PENDING":
st = PlacementScheduleStatePending
}
return st, nil
}
return PlacementScheduleStatePending, err
}
81 changes: 42 additions & 39 deletions executor/show_placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,11 @@ func (e *ShowExec) fetchShowPlacementForDB(ctx context.Context) (err error) {
}

if placement != nil {
schedule, err := fetchDBScheduled(ctx, nil, dbInfo)
state, err := fetchDBScheduleState(ctx, nil, dbInfo)
if err != nil {
return err
}
e.appendRow([]interface{}{"DATABASE " + dbInfo.Name.String(), placement.String(), toScheduleStateString(schedule)})
e.appendRow([]interface{}{"DATABASE " + dbInfo.Name.String(), placement.String(), state.String()})
}

return nil
Expand All @@ -179,12 +179,12 @@ func (e *ShowExec) fetchShowPlacementForTable(ctx context.Context) (err error) {
}

if placement != nil {
schedule, err := fetchTableScheduled(ctx, nil, tblInfo)
state, err := fetchTableScheduleState(ctx, nil, tblInfo)
if err != nil {
return err
}
ident := ast.Ident{Schema: e.Table.DBInfo.Name, Name: tblInfo.Name}
e.appendRow([]interface{}{"TABLE " + ident.String(), placement.String(), toScheduleStateString(schedule)})
e.appendRow([]interface{}{"TABLE " + ident.String(), placement.String(), state.String()})
}

return nil
Expand Down Expand Up @@ -225,15 +225,15 @@ func (e *ShowExec) fetchShowPlacementForPartition(ctx context.Context) (err erro
}

if placement != nil {
schedule, err := fetchPartitionScheduled(ctx, nil, partition)
state, err := fetchPartitionScheduleState(ctx, nil, partition)
if err != nil {
return err
}
tableIndent := ast.Ident{Schema: e.Table.DBInfo.Name, Name: tblInfo.Name}
e.appendRow([]interface{}{
fmt.Sprintf("TABLE %s PARTITION %s", tableIndent.String(), partition.Name.String()),
placement.String(),
toScheduleStateString(schedule),
state.String(),
})
}

Expand All @@ -245,7 +245,7 @@ func (e *ShowExec) fetchShowPlacement(ctx context.Context) error {
return err
}

scheduled := make(map[int64]bool)
scheduled := make(map[int64]infosync.PlacementScheduleState)

if err := e.fetchAllDBPlacements(ctx, scheduled); err != nil {
return err
Expand All @@ -266,7 +266,7 @@ func (e *ShowExec) fetchAllPlacementPolicies() error {
return nil
}

func (e *ShowExec) fetchAllDBPlacements(ctx context.Context, scheduleState map[int64]bool) error {
func (e *ShowExec) fetchAllDBPlacements(ctx context.Context, scheduleState map[int64]infosync.PlacementScheduleState) error {
checker := privilege.GetPrivilegeManager(e.ctx)
activeRoles := e.ctx.GetSessionVars().ActiveRoles

Expand All @@ -284,18 +284,18 @@ func (e *ShowExec) fetchAllDBPlacements(ctx context.Context, scheduleState map[i
}

if placement != nil {
schedule, err := fetchDBScheduled(ctx, scheduleState, dbInfo)
state, err := fetchDBScheduleState(ctx, scheduleState, dbInfo)
if err != nil {
return err
}
e.appendRow([]interface{}{"DATABASE " + dbInfo.Name.String(), placement.String(), toScheduleStateString(schedule)})
e.appendRow([]interface{}{"DATABASE " + dbInfo.Name.String(), placement.String(), state.String()})
}
}

return nil
}

func (e *ShowExec) fetchAllTablePlacements(ctx context.Context, scheduleState map[int64]bool) error {
func (e *ShowExec) fetchAllTablePlacements(ctx context.Context, scheduleState map[int64]infosync.PlacementScheduleState) error {
checker := privilege.GetPrivilegeManager(e.ctx)
activeRoles := e.ctx.GetSessionVars().ActiveRoles

Expand All @@ -322,11 +322,11 @@ func (e *ShowExec) fetchAllTablePlacements(ctx context.Context, scheduleState ma
}

if tblPlacement != nil {
schedule, err := fetchTableScheduled(ctx, scheduleState, tblInfo)
state, err := fetchTableScheduleState(ctx, scheduleState, tblInfo)
if err != nil {
return err
}
rows = append(rows, []interface{}{"TABLE " + ident.String(), tblPlacement.String(), toScheduleStateString(schedule)})
rows = append(rows, []interface{}{"TABLE " + ident.String(), tblPlacement.String(), state.String()})
}

if tblInfo.Partition != nil {
Expand All @@ -338,14 +338,14 @@ func (e *ShowExec) fetchAllTablePlacements(ctx context.Context, scheduleState ma
}

if partitionPlacement != nil {
schedule, err := fetchPartitionScheduled(ctx, scheduleState, &partition)
state, err := fetchPartitionScheduleState(ctx, scheduleState, &partition)
if err != nil {
return err
}
rows = append(rows, []interface{}{
fmt.Sprintf("TABLE %s PARTITION %s", ident.String(), partition.Name.String()),
partitionPlacement.String(),
toScheduleStateString(schedule),
state.String(),
})
}
}
Expand Down Expand Up @@ -421,7 +421,7 @@ func (e *ShowExec) getPolicyPlacement(policyRef *model.PolicyRefInfo) (settings
return policy.PlacementSettings, nil
}

func fetchScheduled(ctx context.Context, scheduleState map[int64]bool, id int64) (bool, error) {
func fetchScheduleState(ctx context.Context, scheduleState map[int64]infosync.PlacementScheduleState, id int64) (infosync.PlacementScheduleState, error) {
if s, ok := scheduleState[id]; ok {
return s, nil
}
Expand All @@ -434,26 +434,30 @@ func fetchScheduled(ctx context.Context, scheduleState map[int64]bool, id int64)
return schedule, err
}

func fetchPartitionScheduled(ctx context.Context, scheduleState map[int64]bool, part *model.PartitionDefinition) (bool, error) {
return fetchScheduled(ctx, scheduleState, part.ID)
func fetchPartitionScheduleState(ctx context.Context, scheduleState map[int64]infosync.PlacementScheduleState, part *model.PartitionDefinition) (infosync.PlacementScheduleState, error) {
return fetchScheduleState(ctx, scheduleState, part.ID)
}

func fetchTableScheduled(ctx context.Context, scheduleState map[int64]bool, table *model.TableInfo) (bool, error) {
schedule, err := fetchScheduled(ctx, scheduleState, table.ID)
func fetchTableScheduleState(ctx context.Context, scheduleState map[int64]infosync.PlacementScheduleState, table *model.TableInfo) (infosync.PlacementScheduleState, error) {
state := infosync.PlacementScheduleStateScheduled

schedule, err := fetchScheduleState(ctx, scheduleState, table.ID)
if err != nil {
return false, err
return state, err
}
if !schedule {
return false, nil
state = accumulateState(state, schedule)
if state != infosync.PlacementScheduleStateScheduled {
return state, nil
}

if table.GetPartitionInfo() != nil {
for _, part := range table.GetPartitionInfo().Definitions {
schedule, err = fetchScheduled(ctx, scheduleState, part.ID)
schedule, err = fetchScheduleState(ctx, scheduleState, part.ID)
if err != nil {
return false, err
return infosync.PlacementScheduleStatePending, err
}
if !schedule {
state = accumulateState(state, schedule)
if state != infosync.PlacementScheduleStateScheduled {
break
}
}
Expand All @@ -462,26 +466,25 @@ func fetchTableScheduled(ctx context.Context, scheduleState map[int64]bool, tabl
return schedule, nil
}

func fetchDBScheduled(ctx context.Context, scheduleState map[int64]bool, db *model.DBInfo) (bool, error) {
schedule := true

var err error
func fetchDBScheduleState(ctx context.Context, scheduleState map[int64]infosync.PlacementScheduleState, db *model.DBInfo) (infosync.PlacementScheduleState, error) {
state := infosync.PlacementScheduleStateScheduled
for _, table := range db.Tables {
schedule, err = fetchTableScheduled(ctx, scheduleState, table)
schedule, err := fetchTableScheduleState(ctx, scheduleState, table)
if err != nil {
return false, err
return state, err
}
if !schedule {
state = accumulateState(state, schedule)
if state != infosync.PlacementScheduleStateScheduled {
break
}
}

return schedule, nil
return state, nil
}

func toScheduleStateString(schedule bool) string {
if schedule {
return "SCHEDULED"
func accumulateState(curr, news infosync.PlacementScheduleState) infosync.PlacementScheduleState {
a, b := int(curr), int(news)
if a > b {
return news
}
return "INPROGRESS"
return curr
}
Loading