Skip to content

Commit

Permalink
Merge branch 'release-5.2' into cherry-pick-4085-to-release-5.2
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaoxinyu authored Apr 15, 2022
2 parents 7d94c10 + 2746528 commit 5de0d2b
Show file tree
Hide file tree
Showing 143 changed files with 6,763 additions and 2,059 deletions.
30 changes: 20 additions & 10 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,17 @@ func NewCapture(pdClient pd.Client, kvStorage tidbkv.Storage, etcdClient *kv.CDC
}

func (c *Capture) reset(ctx context.Context) error {
conf := config.GetGlobalServerConfig()
sess, err := concurrency.NewSession(c.etcdClient.Client.Unwrap(),
concurrency.WithTTL(conf.CaptureSessionTTL))
if err != nil {
return errors.Annotate(
cerror.WrapError(cerror.ErrNewCaptureFailed, err),
"create capture session")
}

c.captureMu.Lock()
defer c.captureMu.Unlock()
conf := config.GetGlobalServerConfig()
c.info = &model.CaptureInfo{
ID: uuid.New().String(),
AdvertiseAddr: conf.AdvertiseAddr,
Expand All @@ -93,11 +101,7 @@ func (c *Capture) reset(ctx context.Context) error {
// It can't be handled even after it fails, so we ignore it.
_ = c.session.Close()
}
sess, err := concurrency.NewSession(c.etcdClient.Client.Unwrap(),
concurrency.WithTTL(conf.CaptureSessionTTL))
if err != nil {
return errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "create capture session")
}

c.session = sess
c.election = concurrency.NewElection(sess, kv.CaptureOwnerKey)

Expand Down Expand Up @@ -192,7 +196,7 @@ func (c *Capture) run(stdCtx context.Context) error {
// when the etcd worker of processor returns an error, it means that the processor throws an unrecoverable serious errors
// (recoverable errors are intercepted in the processor tick)
// so we should also stop the processor and let capture restart or exit
processorErr = c.runEtcdWorker(ctx, c.processorManager, model.NewGlobalState(), processorFlushInterval)
processorErr = c.runEtcdWorker(ctx, c.processorManager, model.NewGlobalState(), processorFlushInterval, "processor")
log.Info("the processor routine has exited", zap.Error(processorErr))
}()
go func() {
Expand Down Expand Up @@ -260,7 +264,7 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error {
log.Info("campaign owner successfully", zap.String("capture-id", c.info.ID))
owner := c.newOwner(c.pdClient)
c.setOwner(owner)
err = c.runEtcdWorker(ctx, owner, model.NewGlobalState(), ownerFlushInterval)
err = c.runEtcdWorker(ctx, owner, model.NewGlobalState(), ownerFlushInterval, "owner")
c.setOwner(nil)
log.Info("run owner exited", zap.Error(err))
// if owner exits, resign the owner key
Expand All @@ -276,13 +280,19 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error {
}
}

func (c *Capture) runEtcdWorker(ctx cdcContext.Context, reactor orchestrator.Reactor, reactorState orchestrator.ReactorState, timerInterval time.Duration) error {
func (c *Capture) runEtcdWorker(
ctx cdcContext.Context,
reactor orchestrator.Reactor,
reactorState orchestrator.ReactorState,
timerInterval time.Duration,
role string,
) error {
etcdWorker, err := orchestrator.NewEtcdWorker(ctx.GlobalVars().EtcdClient.Client, kv.EtcdKeyBase, reactor, reactorState)
if err != nil {
return errors.Trace(err)
}
captureAddr := c.info.AdvertiseAddr
if err := etcdWorker.Run(ctx, c.session, timerInterval, captureAddr); err != nil {
if err := etcdWorker.Run(ctx, c.session, timerInterval, captureAddr, role); err != nil {
// We check ttl of lease instead of check `session.Done`, because
// `session.Done` is only notified when etcd client establish a
// new keepalive request, there could be a time window as long as
Expand Down
3 changes: 2 additions & 1 deletion cdc/capture/http_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ var httpBadRequestError = []*errors.Error{
cerror.ErrAPIInvalidParam, cerror.ErrSinkURIInvalid, cerror.ErrStartTsBeforeGC,
cerror.ErrChangeFeedNotExists, cerror.ErrTargetTsBeforeStartTs, cerror.ErrTableIneligible,
cerror.ErrFilterRuleInvalid, cerror.ErrChangefeedUpdateRefused, cerror.ErrMySQLConnectionError,
cerror.ErrMySQLInvalidConfig,
cerror.ErrMySQLInvalidConfig, cerror.ErrCaptureNotExist, cerror.ErrTaskStatusNotExists,
cerror.ErrTaskPositionNotExists,
}

// IsHTTPBadRequestError check if a error is a http bad request error
Expand Down
14 changes: 2 additions & 12 deletions cdc/capture/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/logutil"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/version"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
Expand All @@ -42,8 +41,6 @@ const (
apiOpVarCaptureID = "capture_id"
// forWardFromCapture is a header to be set when a request is forwarded from another capture
forWardFromCapture = "TiCDC-ForwardFromCapture"
// getOwnerRetryMaxTime is the retry max time to get an owner
getOwnerRetryMaxTime = 3
)

// HTTPHandler is a HTTPHandler of capture
Expand Down Expand Up @@ -698,16 +695,9 @@ func (h *HTTPHandler) forwardToOwner(c *gin.Context) {

var owner *model.CaptureInfo
// get owner
err := retry.Do(ctx, func() error {
o, err := h.capture.GetOwner(ctx)
if err != nil {
log.Info("get owner failed, retry later", zap.Error(err))
return err
}
owner = o
return nil
}, retry.WithBackoffBaseDelay(300), retry.WithMaxTries(getOwnerRetryMaxTime))
owner, err := h.capture.GetOwner(ctx)
if err != nil {
log.Info("get owner failed", zap.Error(err))
_ = c.Error(err)
return
}
Expand Down
13 changes: 10 additions & 3 deletions cdc/capture/http_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func verifyCreateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch
return nil, err
}
// set sortEngine and EnableOldValue
cdcClusterVer, err := version.GetTiCDCClusterVersion(captureInfos)
cdcClusterVer, err := version.GetTiCDCClusterVersion(model.ListVersionsFromCaptureInfos(captureInfos))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -128,7 +128,7 @@ func verifyCreateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch
}

if !replicaConfig.ForceReplicate && !changefeedConfig.IgnoreIneligibleTable {
ineligibleTables, _, err := verifyTables(replicaConfig, capture.kvStorage, changefeedConfig.StartTS)
ineligibleTables, _, err := VerifyTables(replicaConfig, capture.kvStorage, changefeedConfig.StartTS)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -201,7 +201,9 @@ func verifyUpdateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch
return newInfo, nil
}

func verifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, startTs uint64) (ineligibleTables, eligibleTables []model.TableName, err error) {
// VerifyTables catalog tables specified by ReplicaConfig into
// eligible (has an unique index or primary key) and ineligible tables.
func VerifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, startTs uint64) (ineligibleTables, eligibleTables []model.TableName, err error) {
filter, err := filter.NewFilter(replicaConfig)
if err != nil {
return nil, nil, errors.Trace(err)
Expand All @@ -219,6 +221,11 @@ func verifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, s
if filter.ShouldIgnoreTable(tableInfo.TableName.Schema, tableInfo.TableName.Table) {
continue
}
// Sequence is not supported yet, TiCDC needs to filter all sequence tables.
// See https://github.com/pingcap/tiflow/issues/4559
if tableInfo.IsSequence() {
continue
}
if !tableInfo.IsEligible(false /* forceReplicate */) {
ineligibleTables = append(ineligibleTables, tableInfo.TableName)
} else {
Expand Down
6 changes: 2 additions & 4 deletions cdc/entry/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import (

func Test(t *testing.T) { check.TestingT(t) }

type codecSuite struct {
}
type codecSuite struct{}

var _ = check.Suite(&codecSuite{})

Expand All @@ -43,8 +42,7 @@ func (s *codecSuite) TestDecodeRecordKey(c *check.C) {
c.Assert(len(key), check.Equals, 0)
}

type decodeMetaKeySuite struct {
}
type decodeMetaKeySuite struct{}

var _ = check.Suite(&decodeMetaKeySuite{})

Expand Down
93 changes: 60 additions & 33 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ func UnmarshalDDL(raw *model.RawKVEntry) (*timodel.Job, error) {
if err != nil {
return nil, errors.Trace(err)
}
log.Debug("get new DDL job", zap.String("detail", job.String()))
if !job.IsDone() && !job.IsSynced() {
return nil, nil
}
Expand All @@ -311,20 +312,21 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill
colName := colInfo.Name.O
colDatums, exist := datums[colInfo.ID]
var colValue interface{}
if !exist && !fillWithDefaultValue {
continue
}
var err error
var warn string
if exist {
var err error
var warn string
colValue, warn, err = formatColVal(colDatums, colInfo.Tp)
if err != nil {
return nil, errors.Trace(err)
}
if warn != "" {
log.Warn(warn, zap.String("table", tableInfo.TableName.String()), zap.String("column", colInfo.Name.String()))
}
} else if fillWithDefaultValue {
colValue = getDefaultOrZeroValue(colInfo)
} else {
continue
colValue, warn, err = getDefaultOrZeroValue(colInfo)
}
if err != nil {
return nil, errors.Trace(err)
}
if warn != "" {
log.Warn(warn, zap.String("table", tableInfo.TableName.String()), zap.String("column", colInfo.Name.String()))
}
cols[tableInfo.RowColumnsOffset[colInfo.ID]] = &model.Column{
Name: colName,
Expand Down Expand Up @@ -408,7 +410,9 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr

var emptyBytes = make([]byte, 0)

func formatColVal(datum types.Datum, tp byte) (value interface{}, warn string, err error) {
func formatColVal(datum types.Datum, tp byte) (
value interface{}, warn string, err error,
) {
if datum.IsNull() {
return nil, "", nil
}
Expand Down Expand Up @@ -448,35 +452,58 @@ func formatColVal(datum types.Datum, tp byte) (value interface{}, warn string, e
}
return v, warn, nil
default:
// NOTICE: GetValue() may return some types that go sql not support, which will cause sink DML fail
// Make specified convert upper if you need
// Go sql support type ref to: https://github.com/golang/go/blob/go1.17.4/src/database/sql/driver/types.go#L236
return datum.GetValue(), "", nil
}
}

func getDefaultOrZeroValue(col *timodel.ColumnInfo) interface{} {
// see https://github.com/pingcap/tidb/issues/9304
// must use null if TiDB not write the column value when default value is null
// and the value is null
if !mysql.HasNotNullFlag(col.Flag) {
d := types.NewDatum(nil)
return d.GetValue()
// Scenarios when call this function:
// (1) column define default null at creating + insert without explicit column
// (2) alter table add column default xxx + old existing data
// (3) amend + insert without explicit column + alter table add column default xxx
// (4) online DDL drop column + data insert at state delete-only
//
// getDefaultOrZeroValue return interface{} need to meet to require type in
// https://github.com/golang/go/blob/go1.17.4/src/database/sql/driver/types.go#L236
// Supported type is: nil, basic type(Int, Int8,..., Float32, Float64, String), Slice(uint8), other types not support
func getDefaultOrZeroValue(col *timodel.ColumnInfo) (interface{}, string, error) {
var d types.Datum
// NOTICE: SHOULD use OriginDefaultValue here, more info pls ref to
// https://github.com/pingcap/tiflow/issues/4048
// FIXME: Too many corner cases may hit here, like type truncate, timezone
// (1) If this column is uk(no pk), will cause data inconsistency in Scenarios(2)
// (2) If not fix here, will cause data inconsistency in Scenarios(3) directly
// Ref: https://github.com/pingcap/tidb/blob/d2c352980a43bb593db81fd1db996f47af596d91/table/column.go#L489
if col.GetOriginDefaultValue() != nil {
d = types.NewDatum(col.GetOriginDefaultValue())
return d.GetValue(), "", nil
}

if col.GetDefaultValue() != nil {
d := types.NewDatum(col.GetDefaultValue())
return d.GetValue()
}
switch col.Tp {
case mysql.TypeEnum:
// For enum type, if no default value and not null is set,
// the default value is the first element of the enum list
d := types.NewDatum(col.FieldType.Elems[0])
return d.GetValue()
case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar:
return emptyBytes
if !mysql.HasNotNullFlag(col.Flag) {
// NOTICE: NotNullCheck need do after OriginDefaultValue check, as when TiDB meet "amend + add column default xxx",
// ref: https://github.com/pingcap/ticdc/issues/3929
// must use null if TiDB not write the column value when default value is null
// and the value is null, see https://github.com/pingcap/tidb/issues/9304
d = types.NewDatum(nil)
} else {
switch col.Tp {
case mysql.TypeEnum:
// For enum type, if no default value and not null is set,
// the default value is the first element of the enum list
d = types.NewDatum(col.FieldType.Elems[0])
case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar:
return emptyBytes, "", nil
default:
d = table.GetZeroValue(col)
if d.IsNull() {
log.Error("meet unsupported column type", zap.String("column info", col.String()))
}
}
}

d := table.GetZeroValue(col)
return d.GetValue()
return formatColVal(d, col.Tp)
}

// DecodeTableID decodes the raw key to a table ID
Expand Down
Loading

0 comments on commit 5de0d2b

Please sign in to comment.