Skip to content

Commit

Permalink
changefeed(ticdc): remove some panic log (#10277)
Browse files Browse the repository at this point in the history
close #10361
  • Loading branch information
sdojjy authored Dec 26, 2023
1 parent 5264931 commit 17e16d3
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 13 deletions.
3 changes: 2 additions & 1 deletion cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,12 +683,13 @@ func (s *eventFeedSession) requestRegionToStore(
pendingRegions, ok := storePendingRegions[storeAddr]
if !ok {
// Should never happen
log.Panic("pending regions is not found for store",
log.Error("pending regions is not found for store",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID),
zap.Int64("tableID", s.tableID),
zap.String("tableName", s.tableName),
zap.String("store", storeAddr))
return cerror.ErrUnexpected.FastGenByArgs("pending regions is not found for store")
}

state := newRegionFeedState(sri, requestID)
Expand Down
3 changes: 2 additions & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,8 @@ func (c *changefeed) handleBarrier(ctx cdcContext.Context,
case finishBarrier:
c.feedStateManager.MarkFinished()
default:
log.Panic("Unknown barrier type", zap.Int("barrierType", int(barrierTp)))
log.Error("Unknown barrier type", zap.Int("barrierType", int(barrierTp)))
return cerror.ErrUnexpected.FastGenByArgs("Unknown barrier type")
}
}

Expand Down
3 changes: 2 additions & 1 deletion cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,10 +443,11 @@ func (s *ddlSinkImpl) addSpecialComment(ddl *model.DDLEvent) (string, error) {
return "", errors.Trace(err)
}
if len(stms) != 1 {
log.Panic("invalid ddlQuery statement size",
log.Error("invalid ddlQuery statement size",
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID),
zap.String("ddlQuery", ddl.Query))
return "", cerror.ErrUnexpected.FastGenByArgs("invalid ddlQuery statement size")
}
var sb strings.Builder
// translate TiDB feature to special comment
Expand Down
11 changes: 5 additions & 6 deletions cdc/owner/ddl_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,10 +560,9 @@ func TestAddSpecialComment(t *testing.T) {
require.Nil(t, err)
require.Equal(t, ca.result, re)
}
require.Panics(t, func() {
_, _ = s.addSpecialComment(&model.DDLEvent{
Query: "alter table t force, auto_increment = 12;alter table t force, " +
"auto_increment = 12;",
})
}, "invalid ddlQuery statement size")
_, err := s.addSpecialComment(&model.DDLEvent{
Query: "alter table t force, auto_increment = 12;alter table t force, " +
"auto_increment = 12;",
})
require.NotNil(t, err)
}
6 changes: 4 additions & 2 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,14 @@ func (p *processor) AddTableSpan(

startTs := checkpoint.CheckpointTs
if startTs == 0 {
log.Panic("table start ts must not be 0",
log.Error("table start ts must not be 0",
zap.String("captureID", p.captureInfo.ID),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Stringer("span", &span),
zap.Uint64("checkpointTs", startTs),
zap.Bool("isPrepare", isPrepare))
return false, cerror.ErrUnexpected.FastGenByArgs("table start ts must not be 0")
}

state, alreadyExist := p.sinkManager.r.GetTableState(span)
Expand Down Expand Up @@ -493,10 +494,11 @@ func (p *processor) Tick(
return err, nil
}
if p.upstream.IsClosed() {
log.Panic("upstream is closed",
log.Error("upstream is closed",
zap.Uint64("upstreamID", p.upstream.ID),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID))
return cerror.ErrUnexpected.FastGenByArgs("upstream is closed"), nil
}
// skip this tick
if !p.upstream.IsNormal() {
Expand Down
3 changes: 2 additions & 1 deletion cdc/redo/writer/memory/file_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ func (f *fileWorkerGroup) bgWriteLogs(
return errors.Trace(egCtx.Err())
case event := <-inputCh:
if event == nil {
log.Panic("inputCh of redo file worker is closed unexpectedly")
log.Error("inputCh of redo file worker is closed unexpectedly")
return errors.ErrUnexpected.FastGenByArgs("inputCh of redo file worker is closed unexpectedly")
}

if event.data != nil {
Expand Down
3 changes: 2 additions & 1 deletion cdc/sink/dmlsink/txn/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,10 +733,11 @@ func (s *mysqlBackend) sequenceExecute(

func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *preparedDMLs) error {
if len(dmls.sqls) != len(dmls.values) {
log.Panic("unexpected number of sqls and values",
log.Error("unexpected number of sqls and values",
zap.String("changefeed", s.changefeed),
zap.Strings("sqls", dmls.sqls),
zap.Any("values", dmls.values))
return cerror.ErrUnexpected.FastGenByArgs("unexpected number of sqls and values")
}

start := time.Now()
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1001,6 +1001,11 @@ error = '''
url format is invalid
'''

["CDC:ErrUnexpected"]
error = '''
cdc met unexpected error: %s
'''

["CDC:ErrUnknownKVEventType"]
error = '''
unknown kv optype: %s, entry: %v
Expand Down
5 changes: 5 additions & 0 deletions pkg/errors/cdc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,4 +973,9 @@ var (
"meta cache is inconsistent: %s",
errors.RFCCodeText("DFLOW:ErrInconsistentMetaCache"),
)

ErrUnexpected = errors.Normalize(
"cdc met unexpected error: %s",
errors.RFCCodeText("CDC:ErrUnexpected"),
)
)

0 comments on commit 17e16d3

Please sign in to comment.