Skip to content

Commit

Permalink
Merge branch 'release-5.0' into cherry-pick-3540-to-release-5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
overvenus authored Dec 10, 2021
2 parents 4a86f9e + 7255fc7 commit ef1d578
Show file tree
Hide file tree
Showing 225 changed files with 468 additions and 129 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,10 @@ integration_test_build: check_failpoint_ctl
integration_test: integration_test_mysql

integration_test_mysql:
tests/run.sh mysql "$(CASE)"
tests/integration_tests/run.sh mysql "$(CASE)"

integration_test_kafka: check_third_party_binary
tests/run.sh kafka "$(CASE)"
tests/integration_tests/run.sh kafka "$(CASE)"

fmt: tools/bin/gofumports tools/bin/shfmt
@echo "gofmt (simplify)"
Expand Down
33 changes: 23 additions & 10 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
cdcContext "github.com/pingcap/ticdc/pkg/context"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/orchestrator"
"github.com/pingcap/ticdc/pkg/pdtime"
"github.com/pingcap/ticdc/pkg/version"
tidbkv "github.com/pingcap/tidb/kv"
pd "github.com/tikv/pd/client"
Expand All @@ -54,10 +55,11 @@ type Capture struct {
session *concurrency.Session
election *concurrency.Election

pdClient pd.Client
kvStorage tidbkv.Storage
etcdClient *kv.CDCEtcdClient
grpcPool kv.GrpcPool
pdClient pd.Client
kvStorage tidbkv.Storage
etcdClient *kv.CDCEtcdClient
grpcPool kv.GrpcPool
TimeAcquirer pdtime.TimeAcquirer

cancel context.CancelFunc

Expand Down Expand Up @@ -98,6 +100,12 @@ func (c *Capture) reset(ctx context.Context) error {
}
c.session = sess
c.election = concurrency.NewElection(sess, kv.CaptureOwnerKey)

if c.TimeAcquirer != nil {
c.TimeAcquirer.Stop()
}
c.TimeAcquirer = pdtime.NewTimeAcquirer(c.pdClient)

if c.grpcPool != nil {
c.grpcPool.Close()
}
Expand Down Expand Up @@ -146,11 +154,12 @@ func (c *Capture) Run(ctx context.Context) error {

func (c *Capture) run(stdCtx context.Context) error {
ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{
PDClient: c.pdClient,
KVStorage: c.kvStorage,
CaptureInfo: c.info,
EtcdClient: c.etcdClient,
GrpcPool: c.grpcPool,
PDClient: c.pdClient,
KVStorage: c.kvStorage,
CaptureInfo: c.info,
EtcdClient: c.etcdClient,
GrpcPool: c.grpcPool,
TimeAcquirer: c.TimeAcquirer,
})
err := c.register(ctx)
if err != nil {
Expand All @@ -164,7 +173,7 @@ func (c *Capture) run(stdCtx context.Context) error {
cancel()
}()
wg := new(sync.WaitGroup)
wg.Add(3)
wg.Add(4)
var ownerErr, processorErr error
go func() {
defer wg.Done()
Expand All @@ -186,6 +195,10 @@ func (c *Capture) run(stdCtx context.Context) error {
processorErr = c.runEtcdWorker(ctx, c.processorManager, model.NewGlobalState(), processorFlushInterval)
log.Info("the processor routine has exited", zap.Error(processorErr))
}()
go func() {
defer wg.Done()
c.TimeAcquirer.Run(ctx)
}()
go func() {
defer wg.Done()
c.grpcPool.RecycleConn(ctx)
Expand Down
13 changes: 6 additions & 7 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package owner
import (
"context"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -178,7 +177,9 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *model.ChangefeedReactor
return errors.Trace(err)
}
if shouldUpdateState {
c.updateStatus(barrierTs)
pdTime, _ := ctx.GlobalVars().TimeAcquirer.CurrentTimeFromCached()
currentTs := oracle.GetPhysical(pdTime)
c.updateStatus(currentTs, barrierTs)
}
return nil
}
Expand Down Expand Up @@ -438,7 +439,7 @@ func (c *changefeed) asyncExecDDL(ctx cdcContext.Context, job *timodel.Job) (don
return done, nil
}

func (c *changefeed) updateStatus(barrierTs model.Ts) {
func (c *changefeed) updateStatus(currentTs int64, barrierTs model.Ts) {
resolvedTs := barrierTs
for _, position := range c.state.TaskPositions {
if resolvedTs > position.ResolvedTs {
Expand Down Expand Up @@ -470,12 +471,10 @@ func (c *changefeed) updateStatus(barrierTs model.Ts) {
}
return status, changed, nil
})

phyTs := oracle.ExtractPhysical(checkpointTs)

c.metricsChangefeedCheckpointTsGauge.Set(float64(phyTs))
// It is more accurate to get tso from PD, but in most cases since we have
// deployed NTP service, a little bias is acceptable here.
c.metricsChangefeedCheckpointTsLagGauge.Set(float64(oracle.GetPhysical(time.Now())-phyTs) / 1e3)
c.metricsChangefeedCheckpointTsLagGauge.Set(float64(currentTs-phyTs) / 1e3)
}

func (c *changefeed) Close() {
Expand Down
2 changes: 2 additions & 0 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/ticdc/pkg/config"
cdcContext "github.com/pingcap/ticdc/pkg/context"
"github.com/pingcap/ticdc/pkg/orchestrator"
"github.com/pingcap/ticdc/pkg/pdtime"
"github.com/pingcap/ticdc/pkg/txnutil/gc"
"github.com/pingcap/ticdc/pkg/util/testleak"
"github.com/pingcap/ticdc/pkg/version"
Expand Down Expand Up @@ -216,6 +217,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) {
AdvertiseAddr: "127.0.0.1:0000",
Version: version.ReleaseVersion,
},
TimeAcquirer: pdtime.NewTimeAcquirer4Test(),
})
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: "changefeed-id-test",
Expand Down
1 change: 1 addition & 0 deletions cdc/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (m *Manager) Tick(stdCtx context.Context, state orchestrator.ReactorState)
if err := m.handleCommand(); err != nil {
return state, err
}

captureID := ctx.GlobalVars().CaptureInfo.ID
var inactiveChangefeedCount int
for changefeedID, changefeedState := range globalState.Changefeeds {
Expand Down
27 changes: 14 additions & 13 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ func (p *processor) tick(ctx cdcContext.Context, state *model.ChangefeedReactorS
if !p.checkChangefeedNormal() {
return nil, cerror.ErrAdminStopProcessor.GenWithStackByArgs()
}
if skip := p.checkPosition(); skip {
// we should skip this tick after create a task position
if p.createTaskPosition() {
return p.changefeed, nil
}
if err := p.handleErrorCh(ctx); err != nil {
Expand All @@ -176,7 +177,11 @@ func (p *processor) tick(ctx cdcContext.Context, state *model.ChangefeedReactorS
if err := p.checkTablesNum(ctx); err != nil {
return nil, errors.Trace(err)
}
p.handlePosition()
// it is no need to check the err here, because we will use
// local time when an error return, which is acceptable
pdTime, _ := ctx.GlobalVars().TimeAcquirer.CurrentTimeFromCached()

p.handlePosition(oracle.GetPhysical(pdTime))
p.pushResolvedTs2Table()
p.handleWorkload()
p.doGCSchemaStorage()
Expand All @@ -194,10 +199,10 @@ func (p *processor) checkChangefeedNormal() bool {
return true
}

// checkPosition create a new task position, and put it into the etcd state.
// task position maybe be not exist only when the processor is running first time.
func (p *processor) checkPosition() (skipThisTick bool) {
if p.changefeed.TaskPositions[p.captureInfo.ID] != nil {
// createTaskPosition will create a new task position if a task position does not exist.
// task position not exist only when the processor is running first in the first tick.
func (p *processor) createTaskPosition() (skipThisTick bool) {
if _, exist := p.changefeed.TaskPositions[p.captureInfo.ID]; exist {
return false
}
if p.initialized {
Expand Down Expand Up @@ -559,7 +564,7 @@ func (p *processor) checkTablesNum(ctx cdcContext.Context) error {
}

// handlePosition calculates the local resolved ts and local checkpoint ts
func (p *processor) handlePosition() {
func (p *processor) handlePosition(currentTs int64) {
minResolvedTs := uint64(math.MaxUint64)
if p.schemaStorage != nil {
minResolvedTs = p.schemaStorage.ResolvedTs()
Expand All @@ -580,15 +585,11 @@ func (p *processor) handlePosition() {
}

resolvedPhyTs := oracle.ExtractPhysical(minResolvedTs)
// It is more accurate to get tso from PD, but in most cases we have
// deployed NTP service, a little bias is acceptable here.
p.metricResolvedTsLagGauge.Set(float64(oracle.GetPhysical(time.Now())-resolvedPhyTs) / 1e3)
p.metricResolvedTsLagGauge.Set(float64(currentTs-resolvedPhyTs) / 1e3)
p.metricResolvedTsGauge.Set(float64(resolvedPhyTs))

checkpointPhyTs := oracle.ExtractPhysical(minCheckpointTs)
// It is more accurate to get tso from PD, but in most cases we have
// deployed NTP service, a little bias is acceptable here.
p.metricCheckpointTsLagGauge.Set(float64(oracle.GetPhysical(time.Now())-checkpointPhyTs) / 1e3)
p.metricCheckpointTsLagGauge.Set(float64(currentTs-checkpointPhyTs) / 1e3)
p.metricCheckpointTsGauge.Set(float64(checkpointPhyTs))

// minResolvedTs and minCheckpointTs may less than global resolved ts and global checkpoint ts when a new table added, the startTs of the new table is less than global checkpoint ts.
Expand Down
16 changes: 13 additions & 3 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,10 +537,21 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
}
config.Version = version
// See: https://kafka.apache.org/documentation/#replication
// When one of the brokers in a Kafka cluster is down, the partition leaders in this broker is broken, Kafka will election a new partition leader and replication logs, this process will last from a few seconds to a few minutes. Kafka cluster will not provide a writing service in this process.
// Time out in one minute(120 * 500ms).
// When one of the brokers in a Kafka cluster is down, the partition leaders
// in this broker is broken, Kafka will election a new partition leader and
// replication logs, this process will last from a few seconds to a few minutes.
// Kafka cluster will not provide a writing service in this process.
// Time out in one minute.
config.Metadata.Retry.Max = 120
config.Metadata.Retry.Backoff = 500 * time.Millisecond
// If it is not set, this means a metadata request against an unreachable
// cluster (all brokers are unreachable or unresponsive) can take up to
// `Net.[Dial|Read]Timeout * BrokerCount * (Metadata.Retry.Max + 1) +
// Metadata.Retry.Backoff * Metadata.Retry.Max`
// to fail.
// See: https://github.com/Shopify/sarama/issues/765
// and https://github.com/pingcap/ticdc/issues/3352.
config.Metadata.Timeout = 1 * time.Minute

config.Producer.Partitioner = sarama.NewManualPartitioner
config.Producer.MaxMessageBytes = c.MaxMessageBytes
Expand Down Expand Up @@ -580,7 +591,6 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
return nil, errors.Trace(err)
}
}

if c.SaslScram != nil && len(c.SaslScram.SaslUser) != 0 {
config.Net.SASL.Enable = true
config.Net.SASL.User = c.SaslScram.SaslUser
Expand Down
8 changes: 4 additions & 4 deletions metrics/alertmanager/ticdc.rules.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,24 @@ groups:
summary: cdc cluster has multiple owners

- alert: cdc_checkpoint_high_delay
expr: (time() - ticdc_processor_checkpoint_ts / 1000) > 600
expr: ticdc_processor_checkpoint_ts_lag > 600
for: 1m
labels:
env: ENV_LABELS_ENV
level: critical
expr: (time() - ticdc_processor_checkpoint_ts / 1000) > 600
expr: ticdc_processor_checkpoint_ts_lag > 600
annotations:
description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values: {{ $value }}'
value: '{{ $value }}'
summary: cdc processor checkpoint delay more than 10 minutes

- alert: cdc_resolvedts_high_delay
expr: (time() - ticdc_processor_resolved_ts / 1000) > 300
expr: ticdc_processor_resolved_ts_lag > 300
for: 1m
labels:
env: ENV_LABELS_ENV
level: critical
expr: (time() - ticdc_processor_resolved_ts / 1000) > 300
expr: ticdc_processor_resolved_ts_lag > 300
annotations:
description: 'cluster: ENV_LABELS_ENV, instance: {{ $labels.instance }}, values: {{ $value }}'
value: '{{ $value }}'
Expand Down
16 changes: 9 additions & 7 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import (
"log"
"time"

"github.com/pingcap/ticdc/pkg/version"

"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/pdtime"
"github.com/pingcap/ticdc/pkg/version"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
pd "github.com/tikv/pd/client"
Expand All @@ -33,11 +33,12 @@ import (
// the lifecycle of vars in the GlobalVars shoule be aligned with the ticdc server process.
// All field in Vars should be READ-ONLY and THREAD-SAFE
type GlobalVars struct {
PDClient pd.Client
KVStorage tidbkv.Storage
CaptureInfo *model.CaptureInfo
EtcdClient *kv.CDCEtcdClient
GrpcPool kv.GrpcPool
PDClient pd.Client
KVStorage tidbkv.Storage
CaptureInfo *model.CaptureInfo
EtcdClient *kv.CDCEtcdClient
GrpcPool kv.GrpcPool
TimeAcquirer pdtime.TimeAcquirer
}

// ChangefeedVars contains some vars which can be used anywhere in a pipeline
Expand Down Expand Up @@ -184,6 +185,7 @@ func NewBackendContext4Test(withChangefeedVars bool) Context {
AdvertiseAddr: "127.0.0.1:0000",
Version: version.ReleaseVersion,
},
TimeAcquirer: pdtime.NewTimeAcquirer4Test(),
})
if withChangefeedVars {
ctx = WithChangefeedVars(ctx, &ChangefeedVars{
Expand Down
6 changes: 4 additions & 2 deletions pkg/orchestrator/etcd_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"regexp"
"strconv"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -223,7 +224,6 @@ func (s *etcdWorkerSuite) TestEtcdSum(c *check.C) {
defer func() {
_ = cli.Unwrap().Close()
}()

_, err := cli.Put(ctx, testEtcdKeyPrefix+"/sum", "0")
c.Check(err, check.IsNil)

Expand Down Expand Up @@ -272,7 +272,9 @@ func (s *etcdWorkerSuite) TestEtcdSum(c *check.C) {
}

err = errg.Wait()
if err != nil && (errors.Cause(err) == context.DeadlineExceeded || errors.Cause(err) == context.Canceled) {
if err != nil && (errors.Cause(err) == context.DeadlineExceeded ||
errors.Cause(err) == context.Canceled ||
strings.Contains(err.Error(), "etcdserver: request timeout")) {
return
}
c.Check(err, check.IsNil)
Expand Down
Loading

0 comments on commit ef1d578

Please sign in to comment.