Skip to content

Commit

Permalink
fix error
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Nov 9, 2023
1 parent 2ef4bc2 commit 4362870
Showing 1 changed file with 12 additions and 5 deletions.
17 changes: 12 additions & 5 deletions cdc/sink/dmlsink/cloudstorage/dml_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/metrics"
mcloudstorage "github.com/pingcap/tiflow/cdc/sink/metrics/cloudstorage"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -248,13 +248,20 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *single
if inErr != nil {
return 0, inErr
}

defer func() {
if inErr == nil {
inErr = writer.Close(ctx)
closeErr := writer.Close(ctx)
if inErr != nil {
log.Error("failed to close writer", zap.Error(closeErr),
zap.Int("workerID", d.id),
zap.Any("table", task.tableInfo.TableName),
zap.String("namespace", d.changeFeedID.Namespace),
zap.String("changefeed", d.changeFeedID.ID))
if inErr == nil {
inErr = closeErr
}
}
// TODO: maybe we should abort the MultipartUpload here.
}()

if _, inErr = writer.Write(ctx, buf.Bytes()); inErr != nil {
return 0, inErr
}
Expand Down

0 comments on commit 4362870

Please sign in to comment.