Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#2512
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
overvenus authored and ti-chi-bot committed Sep 17, 2021
1 parent cc8157d commit b8a5438
Showing 20 changed files with 1,719 additions and 136 deletions.
4 changes: 2 additions & 2 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
@@ -62,7 +62,7 @@ type Capture struct {
cancel context.CancelFunc

newProcessorManager func() *processor.Manager
newOwner func() *owner.Owner
newOwner func(pd.Client) *owner.Owner
}

// NewCapture returns a new Capture instance
@@ -245,7 +245,7 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error {
}

log.Info("campaign owner successfully", zap.String("capture-id", c.info.ID))
owner := c.newOwner()
owner := c.newOwner(c.pdClient)
c.setOwner(owner)
err = c.runEtcdWorker(ctx, owner, model.NewGlobalState(), ownerFlushInterval)
c.setOwner(nil)
254 changes: 254 additions & 0 deletions cdc/capture/http_validator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package capture

import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/entry"
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/sink"
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/ticdc/pkg/txnutil/gc"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/ticdc/pkg/version"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/r3labs/diff"
"github.com/tikv/client-go/v2/oracle"
)

// verifyCreateChangefeedConfig verify ChangefeedConfig for create a changefeed
func verifyCreateChangefeedConfig(ctx context.Context, changefeedConfig model.ChangefeedConfig, capture *Capture) (*model.ChangeFeedInfo, error) {
// verify sinkURI
if changefeedConfig.SinkURI == "" {
return nil, cerror.ErrSinkURIInvalid.GenWithStackByArgs("sink-uri is empty, can't not create a changefeed without sink-uri")
}

// verify changefeedID
if err := model.ValidateChangefeedID(changefeedConfig.ID); err != nil {
return nil, cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedConfig.ID)
}
// check if the changefeed exists && check if the etcdClient work well
cfStatus, _, err := capture.etcdClient.GetChangeFeedStatus(ctx, changefeedConfig.ID)
if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) {
return nil, err
}
if cfStatus != nil {
return nil, cerror.ErrChangeFeedAlreadyExists.GenWithStackByArgs(changefeedConfig.ID)
}

// verify start-ts
if changefeedConfig.StartTS == 0 {
ts, logical, err := capture.pdClient.GetTS(ctx)
if err != nil {
return nil, cerror.ErrPDEtcdAPIError.GenWithStackByArgs("fail to get ts from pd client")
}
changefeedConfig.StartTS = oracle.ComposeTS(ts, logical)
}

// Ensure the start ts is valid in the next 1 hour.
const ensureTTL = 60 * 60
if err := gc.EnsureChangefeedStartTsSafety(
ctx, capture.pdClient, changefeedConfig.ID, ensureTTL, changefeedConfig.StartTS); err != nil {
if !cerror.ErrStartTsBeforeGC.Equal(err) {
return nil, cerror.ErrPDEtcdAPIError.Wrap(err)
}
return nil, err
}

// verify target-ts
if changefeedConfig.TargetTS > 0 && changefeedConfig.TargetTS <= changefeedConfig.StartTS {
return nil, cerror.ErrTargetTsBeforeStartTs.GenWithStackByArgs(changefeedConfig.TargetTS, changefeedConfig.StartTS)
}

// init replicaConfig
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.ForceReplicate = changefeedConfig.ForceReplicate
if changefeedConfig.MounterWorkerNum != 0 {
replicaConfig.Mounter.WorkerNum = changefeedConfig.MounterWorkerNum
}
if changefeedConfig.SinkConfig != nil {
replicaConfig.Sink = changefeedConfig.SinkConfig
}
if len(changefeedConfig.IgnoreTxnStartTs) != 0 {
replicaConfig.Filter.IgnoreTxnStartTs = changefeedConfig.IgnoreTxnStartTs
}
if len(changefeedConfig.FilterRules) != 0 {
replicaConfig.Filter.Rules = changefeedConfig.FilterRules
}

_, captureInfos, err := capture.etcdClient.GetCaptures(ctx)
if err != nil {
return nil, err
}
// set sortEngine and EnableOldValue
cdcClusterVer, err := version.GetTiCDCClusterVersion(captureInfos)
if err != nil {
return nil, err
}
sortEngine := model.SortUnified
if !cdcClusterVer.ShouldEnableOldValueByDefault() {
replicaConfig.EnableOldValue = false
log.Warn("The TiCDC cluster is built from unknown branch or less than 5.0.0-rc, the old-value are disabled by default.")
if !cdcClusterVer.ShouldEnableUnifiedSorterByDefault() {
sortEngine = model.SortInMemory
}
}

// init ChangefeedInfo
info := &model.ChangeFeedInfo{
SinkURI: changefeedConfig.SinkURI,
Opts: make(map[string]string),
CreateTime: time.Now(),
StartTs: changefeedConfig.StartTS,
TargetTs: changefeedConfig.TargetTS,
Config: replicaConfig,
Engine: sortEngine,
State: model.StateNormal,
SyncPointEnabled: false,
SyncPointInterval: 10 * time.Minute,
CreatorVersion: version.ReleaseVersion,
}

if !replicaConfig.ForceReplicate && !changefeedConfig.IgnoreIneligibleTable {
ineligibleTables, _, err := verifyTables(replicaConfig, capture.kvStorage, changefeedConfig.StartTS)
if err != nil {
return nil, err
}
if len(ineligibleTables) != 0 {
return nil, cerror.ErrTableIneligible.GenWithStackByArgs(ineligibleTables)
}
}

tz, err := util.GetTimezone(changefeedConfig.TimeZone)
if err != nil {
return nil, errors.Annotate(err, "invalid timezone:"+changefeedConfig.TimeZone)
}
ctx = util.PutTimezoneInCtx(ctx, tz)
err = verifySink(ctx, info.SinkURI, info.Config, info.Opts)
if err != nil {
return nil, err
}

return info, nil
}

// verifyUpdateChangefeedConfig verify ChangefeedConfig for update a changefeed
func verifyUpdateChangefeedConfig(ctx context.Context, changefeedConfig model.ChangefeedConfig, oldInfo *model.ChangeFeedInfo) (*model.ChangeFeedInfo, error) {
newInfo, err := oldInfo.Clone()
if err != nil {
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByArgs(err.Error())
}
// verify target_ts
if changefeedConfig.TargetTS != 0 {
if changefeedConfig.TargetTS <= newInfo.StartTs {
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStack("can not update target-ts:%d less than start-ts:%d", changefeedConfig.TargetTS, newInfo.StartTs)
}
newInfo.TargetTs = changefeedConfig.TargetTS
}

// verify rules
if len(changefeedConfig.FilterRules) != 0 {
newInfo.Config.Filter.Rules = changefeedConfig.FilterRules
_, err = filter.VerifyRules(newInfo.Config)
if err != nil {
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByArgs(err.Error())
}
}

if len(changefeedConfig.IgnoreTxnStartTs) != 0 {
newInfo.Config.Filter.IgnoreTxnStartTs = changefeedConfig.IgnoreTxnStartTs
}

if changefeedConfig.MounterWorkerNum != 0 {
newInfo.Config.Mounter.WorkerNum = changefeedConfig.MounterWorkerNum
}

if changefeedConfig.SinkConfig != nil {
newInfo.Config.Sink = changefeedConfig.SinkConfig
}

// verify sink_uri
if changefeedConfig.SinkURI != "" {
newInfo.SinkURI = changefeedConfig.SinkURI
err = verifySink(ctx, changefeedConfig.SinkURI, newInfo.Config, newInfo.Opts)
if err != nil {
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}
}

if !diff.Changed(oldInfo, newInfo) {
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByArgs("changefeed config is the same with the old one, do nothing")
}

return newInfo, nil
}

func verifySink(ctx context.Context, sinkURI string, cfg *config.ReplicaConfig, opts map[string]string) error {
sinkFilter, err := filter.NewFilter(cfg)
if err != nil {
return err
}
errCh := make(chan error)
// TODO: find a better way to verify a sinkURI is valid
s, err := sink.NewSink(ctx, "sink-verify", sinkURI, sinkFilter, cfg, opts, errCh)
if err != nil {
return err
}
err = s.Close(ctx)
if err != nil {
return err
}
select {
case err = <-errCh:
if err != nil {
return err
}
default:
}
return nil
}

func verifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, startTs uint64) (ineligibleTables, eligibleTables []model.TableName, err error) {
filter, err := filter.NewFilter(replicaConfig)
if err != nil {
return nil, nil, errors.Trace(err)
}
meta, err := kv.GetSnapshotMeta(storage, startTs)
if err != nil {
return nil, nil, errors.Trace(err)
}
snap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, startTs, false /* explicitTables */)
if err != nil {
return nil, nil, errors.Trace(err)
}

for _, tableInfo := range snap.Tables() {
if filter.ShouldIgnoreTable(tableInfo.TableName.Schema, tableInfo.TableName.Table) {
continue
}
if !tableInfo.IsEligible(false /* forceReplicate */) {
ineligibleTables = append(ineligibleTables, tableInfo.TableName)
} else {
eligibleTables = append(eligibleTables, tableInfo.TableName)
}
}
return
}
23 changes: 18 additions & 5 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@ import (
"github.com/pingcap/ticdc/cdc/model"
cdcContext "github.com/pingcap/ticdc/pkg/context"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/txnutil/gc"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/store/tikv/oracle"
@@ -39,7 +40,7 @@ type changefeed struct {
scheduler *scheduler
barriers *barriers
feedStateManager *feedStateManager
gcManager GcManager
gcManager gc.Manager

schema *schemaWrap4Owner
sink AsyncSink
@@ -68,7 +69,7 @@ type changefeed struct {
newSink func(ctx cdcContext.Context) (AsyncSink, error)
}

func newChangefeed(id model.ChangeFeedID, gcManager GcManager) *changefeed {
func newChangefeed(id model.ChangeFeedID, gcManager gc.Manager) *changefeed {
c := &changefeed{
id: id,
scheduler: newScheduler(),
@@ -86,7 +87,7 @@ func newChangefeed(id model.ChangeFeedID, gcManager GcManager) *changefeed {
}

func newChangefeed4Test(
id model.ChangeFeedID, gcManager GcManager,
id model.ChangeFeedID, gcManager gc.Manager,
newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error),
newSink func(ctx cdcContext.Context) (AsyncSink, error),
) *changefeed {
@@ -122,7 +123,7 @@ func (c *changefeed) Tick(ctx cdcContext.Context, state *model.ChangefeedReactor
func (c *changefeed) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs uint64) error {
state := c.state.Info.State
if state == model.StateNormal || state == model.StateStopped || state == model.StateError {
if err := c.gcManager.checkStaleCheckpointTs(ctx, checkpointTs); err != nil {
if err := c.gcManager.CheckStaleCheckpointTs(ctx, c.id, checkpointTs); err != nil {
return errors.Trace(err)
}
}
@@ -202,7 +203,19 @@ LOOP:
failpoint.Return(errors.New("failpoint injected retriable error"))
})
if c.state.Info.Config.CheckGCSafePoint {
err := util.CheckSafetyOfStartTs(ctx, ctx.GlobalVars().PDClient, c.state.ID, checkpointTs)
// Check TiDB GC safepoint does not exceed the checkpoint.
//
// We update TTL to 10 minutes,
// 1. to delete the service GC safepoint effectively,
// 2. in case owner update TiCDC service GC safepoint fails.
//
// Also it unblocks TiDB GC, because the service GC safepoint is set to
// 1 hour TTL during creating changefeed.
//
// See more gc doc.
ensureTTL := int64(10 * 60)
err := gc.EnsureChangefeedStartTsSafety(
ctx, ctx.GlobalVars().PDClient, c.state.ID, ensureTTL, checkpointTs)
if err != nil {
return errors.Trace(err)
}
11 changes: 7 additions & 4 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@ import (
"github.com/pingcap/ticdc/pkg/config"
cdcContext "github.com/pingcap/ticdc/pkg/context"
"github.com/pingcap/ticdc/pkg/orchestrator"
"github.com/pingcap/ticdc/pkg/txnutil/gc"
"github.com/pingcap/ticdc/pkg/util/testleak"
"github.com/pingcap/ticdc/pkg/version"
"github.com/pingcap/tidb/store/tikv/oracle"
@@ -107,10 +108,12 @@ type changefeedSuite struct {

func createChangefeed4Test(ctx cdcContext.Context, c *check.C) (*changefeed, *model.ChangefeedReactorState,
map[model.CaptureID]*model.CaptureInfo, *orchestrator.ReactorStateTester) {
ctx.GlobalVars().PDClient = &mockPDClient{updateServiceGCSafePointFunc: func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
return safePoint, nil
}}
gcManager := newGCManager()
ctx.GlobalVars().PDClient = &gc.MockPDClient{
UpdateServiceGCSafePointFunc: func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
return safePoint, nil
},
}
gcManager := gc.NewManager(ctx.GlobalVars().PDClient)
cf := newChangefeed4Test(ctx.ChangefeedVars().ID, gcManager, func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) {
return &mockDDLPuller{resolvedTs: startTs - 1}, nil
}, func(ctx cdcContext.Context) (AsyncSink, error) {
Loading

0 comments on commit b8a5438

Please sign in to comment.