Skip to content

Commit

Permalink
*: Retry when placement PutBundles failed (#30590)
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao authored Dec 10, 2021
1 parent fbcf757 commit 89fd697
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 11 deletions.
8 changes: 4 additions & 4 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (w *worker) onAddTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (v
return ver, errors.Trace(err)
}

if err = infosync.PutRuleBundles(context.TODO(), bundles); err != nil {
if err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}
Expand Down Expand Up @@ -1040,7 +1040,7 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
if job.Type == model.ActionAddTablePartition {
// It is rollbacked from adding table partition, just remove addingDefinitions from tableInfo.
physicalTableIDs, pNames, rollbackBundles := rollbackAddingPartitionInfo(tblInfo)
err = infosync.PutRuleBundles(context.TODO(), rollbackBundles)
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), rollbackBundles)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
Expand Down Expand Up @@ -1208,7 +1208,7 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e
return ver, errors.Trace(err)
}

err = infosync.PutRuleBundles(context.TODO(), bundles)
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
Expand Down Expand Up @@ -1412,7 +1412,7 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
return ver, errors.Trace(err)
}

if err = infosync.PutRuleBundles(context.TODO(), bundles); err != nil {
if err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/placement_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func onAlterPlacementPolicy(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
cp := bundle.Clone()
bundles = append(bundles, cp.Reset(placement.RuleIndexPartition, []int64{id}))
}
err = infosync.PutRuleBundles(context.TODO(), bundles)
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
Expand Down
8 changes: 4 additions & 4 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func onCreateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
}

// Send the placement bundle to PD.
err = infosync.PutRuleBundles(context.TODO(), bundles)
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
Expand Down Expand Up @@ -580,7 +580,7 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro
return ver, errors.Trace(err)
}

err = infosync.PutRuleBundles(context.TODO(), bundles)
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
if err != nil {
job.State = model.JobStateCancelled
return 0, errors.Wrapf(err, "failed to notify PD the placement rules")
Expand Down Expand Up @@ -1302,7 +1302,7 @@ func onAlterTablePartitionOptions(d *ddlCtx, t *meta.Meta, job *model.Job) (ver

// Send the placement bundle to PD.
if bundle != nil {
err = infosync.PutRuleBundles(context.TODO(), []*placement.Bundle{bundle})
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), []*placement.Bundle{bundle})
}

if err != nil {
Expand Down Expand Up @@ -1353,7 +1353,7 @@ func onAlterTablePlacement(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,

// Send the placement bundle to PD.
if bundle != nil {
err = infosync.PutRuleBundles(context.TODO(), []*placement.Bundle{bundle})
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), []*placement.Bundle{bundle})
}

if err != nil {
Expand Down
28 changes: 28 additions & 0 deletions domain/infosync/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package infosync

import (
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/util/dbterror"
)

var (
// ErrHTTPServiceError means we got a http response with a status code which is not '2xx'
ErrHTTPServiceError = dbterror.ClassDomain.NewStdErr(
errno.ErrHTTPServiceError, mysql.Message("HTTP request failed with status %s", nil),
)
)
41 changes: 40 additions & 1 deletion domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ const (
TopologyPrometheus = "/topology/prometheus"
// TablePrometheusCacheExpiry is the expiry time for prometheus address cache.
TablePrometheusCacheExpiry = 10 * time.Second
// RequestRetryInterval is the sleep time before next retry for http request
RequestRetryInterval = 200 * time.Millisecond
// SyncBundlesMaxRetry is the max retry times for sync placement bundles
SyncBundlesMaxRetry = 3
)

// ErrPrometheusAddrIsNotSet is the error that Prometheus address is not set in PD and etcd
Expand Down Expand Up @@ -353,7 +357,7 @@ func doRequest(ctx context.Context, addrs []string, route, method string, body i
return nil, err
}
if res.StatusCode != http.StatusOK {
err = errors.Errorf("%s", bodyBytes)
err = ErrHTTPServiceError.FastGen("%s", bodyBytes)
if res.StatusCode == http.StatusNotFound || res.StatusCode == http.StatusPreconditionFailed {
err = nil
bodyBytes = nil
Expand Down Expand Up @@ -427,6 +431,16 @@ func GetRuleBundle(ctx context.Context, name string) (*placement.Bundle, error)

// PutRuleBundles is used to post specific rule bundles to PD.
func PutRuleBundles(ctx context.Context, bundles []*placement.Bundle) error {
failpoint.Inject("putRuleBundlesError", func(isServiceError failpoint.Value) {
var err error
if isServiceError.(bool) {
err = ErrHTTPServiceError.FastGen("mock service error")
} else {
err = errors.New("mock other error")
}
failpoint.Return(err)
})

is, err := getGlobalInfoSyncer()
if err != nil {
return err
Expand All @@ -435,6 +449,31 @@ func PutRuleBundles(ctx context.Context, bundles []*placement.Bundle) error {
return is.placementManager.PutRuleBundles(ctx, bundles)
}

// PutRuleBundlesWithRetry will retry for specified times when PutRuleBundles failed
func PutRuleBundlesWithRetry(ctx context.Context, bundles []*placement.Bundle, maxRetry int, interval time.Duration) (err error) {
if maxRetry < 0 {
maxRetry = 0
}

for i := 0; i <= maxRetry; i++ {
if err = PutRuleBundles(ctx, bundles); err == nil || ErrHTTPServiceError.Equal(err) {
return err
}

if i != maxRetry {
logutil.BgLogger().Warn("Error occurs when PutRuleBundles, retry", zap.Error(err))
time.Sleep(interval)
}
}

return
}

// PutRuleBundlesWithDefaultRetry will retry for default times
func PutRuleBundlesWithDefaultRetry(ctx context.Context, bundles []*placement.Bundle) (err error) {
return PutRuleBundlesWithRetry(ctx, bundles, SyncBundlesMaxRetry, RequestRetryInterval)
}

func (is *InfoSyncer) getAllServerInfo(ctx context.Context) (map[string]*ServerInfo, error) {
allInfo := make(map[string]*ServerInfo)
if is.etcdCli == nil {
Expand Down
65 changes: 65 additions & 0 deletions domain/infosync/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/util/testbridge"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/integration"
Expand Down Expand Up @@ -145,3 +147,66 @@ func (is *InfoSyncer) ttlKeyExists(ctx context.Context) (bool, error) {
}
return len(resp.Kvs) == 1, nil
}

func TestPutBundlesRetry(t *testing.T) {
_, err := GlobalInfoSyncerInit(context.TODO(), "test", func() uint64 { return 1 }, nil, false)
require.NoError(t, err)

bundle, err := placement.NewBundleFromOptions(&model.PlacementSettings{PrimaryRegion: "r1", Regions: "r1,r2"})
require.NoError(t, err)
bundle = bundle.Reset(placement.RuleIndexTable, []int64{1024})

t.Run("serviceErrorShouldNotRetry", func(t *testing.T) {
require.NoError(t, PutRuleBundles(context.TODO(), []*placement.Bundle{{ID: bundle.ID}}))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError", "1*return(true)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError"))
}()

err := PutRuleBundlesWithRetry(context.TODO(), []*placement.Bundle{bundle}, 3, time.Millisecond)
require.Error(t, err)
require.Equal(t, "[domain:8243]mock service error", err.Error())

got, err := GetRuleBundle(context.TODO(), bundle.ID)
require.NoError(t, err)
require.True(t, got.IsEmpty())
})

t.Run("nonServiceErrorShouldRetry", func(t *testing.T) {
require.NoError(t, PutRuleBundles(context.TODO(), []*placement.Bundle{{ID: bundle.ID}}))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError", "3*return(false)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError"))
}()

err := PutRuleBundlesWithRetry(context.TODO(), []*placement.Bundle{bundle}, 3, time.Millisecond)
require.NoError(t, err)

got, err := GetRuleBundle(context.TODO(), bundle.ID)
require.NoError(t, err)

gotJSON, err := json.Marshal(got)
require.NoError(t, err)

expectJSON, err := json.Marshal(bundle)
require.NoError(t, err)

require.Equal(t, expectJSON, gotJSON)
})

t.Run("nonServiceErrorRetryAndFail", func(t *testing.T) {
require.NoError(t, PutRuleBundles(context.TODO(), []*placement.Bundle{{ID: bundle.ID}}))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError", "4*return(false)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/putRuleBundlesError"))
}()

err := PutRuleBundlesWithRetry(context.TODO(), []*placement.Bundle{bundle}, 3, time.Millisecond)
require.Error(t, err)
require.Equal(t, "mock other error", err.Error())

got, err := GetRuleBundle(context.TODO(), bundle.ID)
require.NoError(t, err)
require.True(t, got.IsEmpty())
})
}
1 change: 1 addition & 0 deletions errno/errcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,7 @@ const (
ErrPlacementPolicyWithDirectOption = 8240
ErrPlacementPolicyInUse = 8241
ErrOptOnCacheTable = 8242
ErrHTTPServiceError = 8243
// TiKV/PD/TiFlash errors.
ErrPDServerTimeout = 9001
ErrTiKVServerTimeout = 9002
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,11 @@ error = '''
Information schema is changed during the execution of the statement(for example, table definition may be updated by other DDL ran in parallel). If you see this error often, try increasing `tidb_max_delta_schema_count`. [try again later]
'''

["domain:8243"]
error = '''
HTTP request failed with status %s
'''

["domain:9009"]
error = '''
Prometheus address is not set in PD and etcd
Expand Down
2 changes: 1 addition & 1 deletion store/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1914,7 +1914,7 @@ func (w *GCWorker) doGCPlacementRules(dr util.DelRangeTask) (err error) {
for _, id := range physicalTableIDs {
bundles = append(bundles, placement.NewBundle(id))
}
return infosync.PutRuleBundles(context.TODO(), bundles)
return infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
}

func (w *GCWorker) doGCLabelRules(dr util.DelRangeTask) (err error) {
Expand Down

0 comments on commit 89fd697

Please sign in to comment.