Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upstream (ticdc): add upstream to manage all upstream related resources. #5282

Merged
merged 32 commits into from
May 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
654cae4
upstream (ticdc): add upstream
asddongmen Apr 22, 2022
145b4c5
Merge branch 'master' into decouple_pdClient
asddongmen Apr 24, 2022
734ecec
upstream (ticdc): fix all unit tests.
asddongmen Apr 26, 2022
f0730d8
Merge branch 'master' into add_upstream
asddongmen Apr 26, 2022
a8fd7cd
fix error and typos
asddongmen Apr 26, 2022
17702b8
Merge branch 'add_upstream' of github.com:asddongmen/ticdc into add_u…
asddongmen Apr 26, 2022
0ab8c17
upstream (ticdc): fix panic.
asddongmen Apr 27, 2022
9cf3e10
upstream (ticdc): address comment
asddongmen Apr 27, 2022
1ad9b30
Merge branch 'master' into add_upstream
asddongmen Apr 27, 2022
66ba94d
address comments
asddongmen Apr 27, 2022
885c916
Merge branch 'master' into add_upstream
asddongmen Apr 27, 2022
3c17eaf
Merge branch 'master' into add_upstream
asddongmen Apr 28, 2022
c01420f
upstream (ticdc): add status for upstream.
asddongmen Apr 29, 2022
fd89b45
Merge branch 'add_upstream' of github.com:asddongmen/ticdc into add_u…
asddongmen Apr 29, 2022
62e2721
Merge branch 'master' into add_upstream
asddongmen Apr 29, 2022
5b3664d
fix error
asddongmen Apr 29, 2022
41f988e
Merge branch 'add_upstream' of github.com:asddongmen/ticdc into add_u…
asddongmen Apr 29, 2022
c092491
fix error
asddongmen Apr 29, 2022
acbc16a
upstream (ticdc): address comments
asddongmen May 5, 2022
4054983
fix lint error
asddongmen May 5, 2022
7ab1bd2
Merge branch 'master' into add_upstream
asddongmen May 5, 2022
167d1dc
fix error
asddongmen May 5, 2022
fcc902b
Merge branch 'master' into add_upstream
asddongmen May 5, 2022
2d47bff
address comments
asddongmen May 5, 2022
e388ed3
Merge branch 'add_upstream' of github.com:asddongmen/ticdc into add_u…
asddongmen May 5, 2022
6a04cbb
address comments
asddongmen May 5, 2022
4243f57
Merge branch 'master' into add_upstream
asddongmen May 5, 2022
f3cfa69
address comments.
asddongmen May 5, 2022
e7668ef
Merge branch 'add_upstream' of github.com:asddongmen/ticdc into add_u…
asddongmen May 5, 2022
8559044
Merge branch 'master' into add_upstream
asddongmen May 5, 2022
4122dc4
resolved conflict
asddongmen May 5, 2022
ff2435b
fix error
asddongmen May 5, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions cdc/api/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/version"
"github.com/r3labs/diff"
Expand All @@ -42,6 +43,9 @@ func verifyCreateChangefeedConfig(
changefeedConfig model.ChangefeedConfig,
capture *capture.Capture,
) (*model.ChangeFeedInfo, error) {
// TODO(dongmen): we should pass ClusterID in ChangefeedConfig in the upcoming future
upStream := capture.UpstreamManager.Get(upstream.DefaultClusterID)

// verify sinkURI
if changefeedConfig.SinkURI == "" {
return nil, cerror.ErrSinkURIInvalid.GenWithStackByArgs("sink-uri is empty, can't not create a changefeed without sink-uri")
Expand All @@ -63,7 +67,7 @@ func verifyCreateChangefeedConfig(

// verify start-ts
if changefeedConfig.StartTS == 0 {
ts, logical, err := capture.PDClient.GetTS(ctx)
ts, logical, err := upStream.PDClient.GetTS(ctx)
if err != nil {
return nil, cerror.ErrPDEtcdAPIError.GenWithStackByArgs("fail to get ts from pd client")
}
Expand All @@ -73,7 +77,8 @@ func verifyCreateChangefeedConfig(
// Ensure the start ts is valid in the next 1 hour.
const ensureTTL = 60 * 60
if err := gc.EnsureChangefeedStartTsSafety(
ctx, capture.PDClient,
ctx,
upStream.PDClient,
model.DefaultChangeFeedID(changefeedConfig.ID),
ensureTTL, changefeedConfig.StartTS); err != nil {
if !cerror.ErrStartTsBeforeGC.Equal(err) {
Expand Down Expand Up @@ -137,7 +142,7 @@ func verifyCreateChangefeedConfig(
}

if !replicaConfig.ForceReplicate && !changefeedConfig.IgnoreIneligibleTable {
ineligibleTables, _, err := VerifyTables(replicaConfig, capture.Storage, changefeedConfig.StartTS)
ineligibleTables, _, err := VerifyTables(replicaConfig, upStream.KVStorage, changefeedConfig.StartTS)
if err != nil {
return nil, err
}
Expand Down
90 changes: 27 additions & 63 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,11 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/server/v3/mvcc"
"go.uber.org/zap"
"golang.org/x/time/rate"

"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/owner"
"github.com/pingcap/tiflow/cdc/processor"
Expand All @@ -44,7 +40,7 @@ import (
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/p2p"
"github.com/pingcap/tiflow/pkg/pdtime"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/version"
)

Expand All @@ -55,19 +51,16 @@ type Capture struct {
info *model.CaptureInfo
processorManager *processor.Manager

ownerMu sync.Mutex
owner owner.Owner
pdEnpoints []string
UpstreamManager *upstream.Manager
ownerMu sync.Mutex
owner owner.Owner

// session keeps alive between the capture and etcd
session *concurrency.Session
election *concurrency.Election

PDClient pd.Client
Storage tidbkv.Storage
EtcdClient *etcd.CDCEtcdClient
grpcPool kv.GrpcPool
regionCache *tikv.RegionCache
pdClock *pdtime.PDClock
sorterSystem *ssystem.System

enableNewScheduler bool
Expand All @@ -89,20 +82,18 @@ type Capture struct {

cancel context.CancelFunc

newProcessorManager func() *processor.Manager
newOwner func(pd.Client) owner.Owner
newProcessorManager func(upstreamManager *upstream.Manager) *processor.Manager
newOwner func(upstreamManager *upstream.Manager) owner.Owner
}

// NewCapture returns a new Capture instance
func NewCapture(pdClient pd.Client, kvStorage tidbkv.Storage, etcdClient *etcd.CDCEtcdClient, grpcService *p2p.ServerWrapper) *Capture {
func NewCapture(pdEnpoints []string, etcdClient *etcd.CDCEtcdClient, grpcService *p2p.ServerWrapper) *Capture {
conf := config.GetGlobalServerConfig()
return &Capture{
PDClient: pdClient,
Storage: kvStorage,
EtcdClient: etcdClient,
grpcService: grpcService,
cancel: func() {},

EtcdClient: etcdClient,
grpcService: grpcService,
cancel: func() {},
pdEnpoints: pdEnpoints,
enableNewScheduler: conf.Debug.EnableNewScheduler,
newProcessorManager: processor.NewManager,
newOwner: owner.NewOwner,
Expand Down Expand Up @@ -135,23 +126,26 @@ func (c *Capture) reset(ctx context.Context) error {
AdvertiseAddr: conf.AdvertiseAddr,
Version: version.ReleaseVersion,
}
c.processorManager = c.newProcessorManager()

if c.UpstreamManager != nil {
c.UpstreamManager.Close()
}
c.UpstreamManager = upstream.NewManager(ctx)
err = c.UpstreamManager.Add(upstream.DefaultClusterID, c.pdEnpoints)
if err != nil {
return errors.Annotate(
cerror.WrapError(cerror.ErrNewCaptureFailed, err),
"add default upstream failed")
}

c.processorManager = c.newProcessorManager(c.UpstreamManager)
if c.session != nil {
// It can't be handled even after it fails, so we ignore it.
_ = c.session.Close()
}
c.session = sess
c.election = concurrency.NewElection(sess, etcd.CaptureOwnerKey)

if c.pdClock != nil {
c.pdClock.Stop()
}

c.pdClock, err = pdtime.NewClock(ctx, c.PDClient)
if err != nil {
return errors.Trace(err)
}

if c.tableActorSystem != nil {
c.tableActorSystem.Stop()
}
Expand Down Expand Up @@ -183,9 +177,6 @@ func (c *Capture) reset(ctx context.Context) error {
"create sorter system")
}
}
if c.grpcPool != nil {
c.grpcPool.Close()
}

if c.enableNewScheduler {
c.grpcService.Reset(nil)
Expand All @@ -197,12 +188,6 @@ func (c *Capture) reset(ctx context.Context) error {
}
}

c.grpcPool = kv.NewGrpcPoolImpl(ctx, conf.Security)
if c.regionCache != nil {
c.regionCache.Close()
}
c.regionCache = tikv.NewRegionCache(c.PDClient)

if c.enableNewScheduler {
messageServerConfig := conf.Debug.Messages.ToMessageServerConfig()
c.MessageServer = p2p.NewMessageServer(c.info.ID, messageServerConfig)
Expand Down Expand Up @@ -265,13 +250,8 @@ func (c *Capture) Run(ctx context.Context) error {

func (c *Capture) run(stdCtx context.Context) error {
ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{
PDClient: c.PDClient,
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
KVStorage: c.Storage,
CaptureInfo: c.info,
EtcdClient: c.EtcdClient,
GrpcPool: c.grpcPool,
RegionCache: c.regionCache,
PDClock: c.pdClock,
TableActorSystem: c.tableActorSystem,
SorterSystem: c.sorterSystem,
MessageServer: c.MessageServer,
Expand Down Expand Up @@ -324,16 +304,6 @@ func (c *Capture) run(stdCtx context.Context) error {
processorErr = c.runEtcdWorker(ctx, c.processorManager, globalState, processorFlushInterval, "processor")
log.Info("the processor routine has exited", zap.Error(processorErr))
}()
wg.Add(1)
go func() {
defer wg.Done()
c.pdClock.Run(ctx)
}()
wg.Add(1)
go func() {
defer wg.Done()
c.grpcPool.RecycleConn(ctx)
}()
if c.enableNewScheduler {
wg.Add(1)
go func() {
Expand Down Expand Up @@ -419,7 +389,7 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error {
zap.String("captureID", c.info.ID),
zap.Int64("ownerRev", ownerRev))

owner := c.newOwner(c.PDClient)
owner := c.newOwner(c.UpstreamManager)
c.setOwner(owner)

globalState := orchestrator.NewGlobalState()
Expand Down Expand Up @@ -541,13 +511,7 @@ func (c *Capture) AsyncClose() {
if c.processorManager != nil {
c.processorManager.AsyncClose()
}
if c.grpcPool != nil {
c.grpcPool.Close()
}
if c.regionCache != nil {
c.regionCache.Close()
c.regionCache = nil
}

if c.tableActorSystem != nil {
c.tableActorSystem.Stop()
c.tableActorSystem = nil
Expand Down
1 change: 1 addition & 0 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (s FeedState) IsNeeded(need string) bool {

// ChangeFeedInfo describes the detail of a ChangeFeed
type ChangeFeedInfo struct {
ClusterID uint64 `json:"cluster-id"`
SinkURI string `json:"sink-uri"`
Opts map[string]string `json:"opts"`
CreateTime time.Time `json:"create-time"`
Expand Down
32 changes: 18 additions & 14 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
Expand All @@ -42,10 +43,10 @@ type changefeed struct {
id model.ChangeFeedID
state *orchestrator.ChangefeedReactorState

upStream *upstream.Upstream
scheduler scheduler
barriers *barriers
feedStateManager *feedStateManager
gcManager gc.Manager
redoManager redo.LogManager

schema *schemaWrap4Owner
Expand Down Expand Up @@ -80,19 +81,19 @@ type changefeed struct {
metricsChangefeedResolvedTsLagGauge prometheus.Gauge
metricsChangefeedTickDuration prometheus.Observer

newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error)
newDDLPuller func(ctx cdcContext.Context, upStream *upstream.Upstream, startTs uint64) (DDLPuller, error)
newSink func() DDLSink
newScheduler func(ctx cdcContext.Context, startTs uint64) (scheduler, error)
}

func newChangefeed(id model.ChangeFeedID, gcManager gc.Manager) *changefeed {
func newChangefeed(id model.ChangeFeedID, upStream *upstream.Upstream) *changefeed {
c := &changefeed{
id: id,
// The scheduler will be created lazily.
scheduler: nil,
barriers: newBarriers(),
feedStateManager: newFeedStateManager(),
gcManager: gcManager,
upStream: upStream,

errCh: make(chan error, defaultErrChSize),
cancel: func() {},
Expand All @@ -105,19 +106,22 @@ func newChangefeed(id model.ChangeFeedID, gcManager gc.Manager) *changefeed {
}

func newChangefeed4Test(
id model.ChangeFeedID, gcManager gc.Manager,
newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error),
id model.ChangeFeedID, upStream *upstream.Upstream,
newDDLPuller func(ctx cdcContext.Context, upStream *upstream.Upstream, startTs uint64) (DDLPuller, error),
newSink func() DDLSink,
) *changefeed {
c := newChangefeed(id, gcManager)
c := newChangefeed(id, upStream)
c.newDDLPuller = newDDLPuller
c.newSink = newSink
return c
}

func (c *changefeed) Tick(ctx cdcContext.Context, state *orchestrator.ChangefeedReactorState, captures map[model.CaptureID]*model.CaptureInfo) {
startTime := time.Now()

// skip this tick
if !c.upStream.IsNormal() {
return
}
ctx = cdcContext.WithErrorHandler(ctx, func(err error) error {
c.errCh <- errors.Trace(err)
return nil
Expand Down Expand Up @@ -162,7 +166,7 @@ func (c *changefeed) checkStaleCheckpointTs(ctx cdcContext.Context, checkpointTs
failpoint.Inject("InjectChangefeedFastFailError", func() error {
return cerror.ErrGCTTLExceeded.FastGen("InjectChangefeedFastFailError")
})
if err := c.gcManager.CheckStaleCheckpointTs(ctx, c.id, checkpointTs); err != nil {
if err := c.upStream.GCManager.CheckStaleCheckpointTs(ctx, c.id, checkpointTs); err != nil {
return errors.Trace(err)
}
}
Expand Down Expand Up @@ -238,7 +242,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *orchestrator.Changefeed
return errors.Trace(err)
}

pdTime, _ := ctx.GlobalVars().PDClock.CurrentTime()
pdTime, _ := c.upStream.PDClock.CurrentTime()
currentTs := oracle.GetPhysical(pdTime)

// CheckpointCannotProceed implies that not all tables are being replicated normally,
Expand Down Expand Up @@ -300,7 +304,7 @@ LOOP:
// See more gc doc.
ensureTTL := int64(10 * 60)
err := gc.EnsureChangefeedStartTsSafety(
ctx, ctx.GlobalVars().PDClient, c.state.ID, ensureTTL, checkpointTs)
ctx, c.upStream.PDClient, c.state.ID, ensureTTL, checkpointTs)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -318,20 +322,19 @@ LOOP:
// So we need to process all DDLs from the range [checkpointTs, ...), but since the semantics of start-ts requires
// the lower bound of an open interval, i.e. (startTs, ...), we pass checkpointTs-1 as the start-ts to initialize
// the schema cache.
c.schema, err = newSchemaWrap4Owner(ctx.GlobalVars().KVStorage,
c.schema, err = newSchemaWrap4Owner(c.upStream.KVStorage,
checkpointTs-1, c.state.Info.Config, ctx.ChangefeedVars().ID)
if err != nil {
return errors.Trace(err)
}

cancelCtx, cancel := cdcContext.WithCancel(ctx)
c.cancel = cancel

c.sink = c.newSink()
c.sink.run(cancelCtx, cancelCtx.ChangefeedVars().ID, cancelCtx.ChangefeedVars().Info)

// Refer to the previous comment on why we use (checkpointTs-1).
c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs-1)
c.ddlPuller, err = c.newDDLPuller(cancelCtx, c.upStream, checkpointTs-1)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -650,6 +653,7 @@ func (c *changefeed) Close(ctx cdcContext.Context) {
startTime := time.Now()

c.releaseResources(ctx)

costTime := time.Since(startTime)
if costTime > changefeedLogsWarnDuration {
log.Warn("changefeed close took too long",
Expand Down
Loading