diff --git a/canal/rows.go b/canal/rows.go index 9e5df3981..9f618a9fb 100644 --- a/canal/rows.go +++ b/canal/rows.go @@ -53,6 +53,13 @@ func (r *RowsEvent) handleUnsigned() { for i := 0; i < len(r.Rows); i++ { for _, columnIdx := range r.Table.UnsignedColumns { + // When Canal.delay is big, after call Canal.StartFromGTID(), + // we will get the newest table schema (for example, after DDL "alter table add column xxx unsigned..."), + // but the binlog data can be very old (before DDL "alter table add column xxx unsigned..."), + // results in max(columnIdx) >= len(r.Rows[i]), then r.Rows[i][columnIdx] panic. + if columnIdx >= len(r.Rows[i]) { + continue + } switch value := r.Rows[i][columnIdx].(type) { case int8: r.Rows[i][columnIdx] = uint8(value) diff --git a/canal/rows_test.go b/canal/rows_test.go new file mode 100644 index 000000000..dfbce0b39 --- /dev/null +++ b/canal/rows_test.go @@ -0,0 +1,60 @@ +package canal + +import ( + "testing" + + "github.com/go-mysql-org/go-mysql/replication" + "github.com/go-mysql-org/go-mysql/schema" + "github.com/stretchr/testify/require" +) + +func TestRowsEvent_handleUnsigned(t *testing.T) { + type fields struct { + Table *schema.Table + Action string + Rows [][]interface{} + Header *replication.EventHeader + } + tests := []struct { + name string + fields fields + wantRows [][]interface{} + }{ + { + name: "rows_event_handle_unsigned", + fields: fields{ + Table: &schema.Table{ + // columns 1,3,5,7,9 should be converted from signed to unsigned, + // column 10 is out of range and should be ignored, don't panic. + UnsignedColumns: []int{1, 3, 5, 7, 9, 10}, + }, + Rows: [][]interface{}{{ + int8(8), int8(8), + int16(16), int16(16), + int32(32), int32(32), + int64(64), int64(64), + int(128), int(128)}, + }, + }, + wantRows: [][]interface{}{{ + int8(8), uint8(8), + int16(16), uint16(16), + int32(32), uint32(32), + int64(64), uint64(64), + int(128), uint(128)}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &RowsEvent{ + Table: tt.fields.Table, + Action: tt.fields.Action, + Rows: tt.fields.Rows, + Header: tt.fields.Header, + } + r.handleUnsigned() + require.Equal(t, tt.fields.Rows, tt.wantRows) + }) + } +}