Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): add total write bytes counter to sink (#10040) #10567

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions cdc/sink/dmlsink/cloudstorage/dml_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,23 +230,24 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *single
var callbacks []func()
buf := bytes.NewBuffer(make([]byte, 0, task.size))
rowsCnt := 0
bytesCnt := int64(0)
for _, msg := range task.msgs {
d.metricWriteBytes.Add(float64(len(msg.Value)))
bytesCnt += int64(len(msg.Value))
rowsCnt += msg.GetRowsCount()
buf.Write(msg.Value)
callbacks = append(callbacks, msg.Callback)
}

if err := d.statistics.RecordBatchExecution(func() (int, error) {
if err := d.statistics.RecordBatchExecution(func() (int, int64, error) {
if d.config.FlushConcurrency <= 1 {
return rowsCnt, d.storage.WriteFile(ctx, path, buf.Bytes())
return rowsCnt, bytesCnt, d.storage.WriteFile(ctx, path, buf.Bytes())
}

writer, inErr := d.storage.Create(ctx, path, &storage.WriterOption{
Concurrency: d.config.FlushConcurrency,
})
if inErr != nil {
return 0, inErr
return 0, 0, inErr
}

defer func() {
Expand All @@ -263,13 +264,14 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *single
}
}()
if _, inErr = writer.Write(ctx, buf.Bytes()); inErr != nil {
return 0, inErr
return 0, 0, inErr
}
return rowsCnt, nil
return rowsCnt, bytesCnt, nil
}); err != nil {
return err
}

d.metricWriteBytes.Add(float64(bytesCnt))
d.metricFileCount.Add(1)
for _, cb := range callbacks {
if cb != nil {
Expand Down
6 changes: 3 additions & 3 deletions cdc/sink/dmlsink/mq/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,11 +296,11 @@ func (w *worker) sendMessages(ctx context.Context) error {
}
for _, message := range future.Messages {
start := time.Now()
if err := w.statistics.RecordBatchExecution(func() (int, error) {
if err := w.statistics.RecordBatchExecution(func() (int, int64, error) {
if err := w.producer.AsyncSendMessage(ctx, future.Topic, future.Partition, message); err != nil {
return 0, err
return 0, 0, err
}
return message.GetRowsCount(), nil
return message.GetRowsCount(), int64(message.Length()), nil
}); err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions cdc/sink/dmlsink/txn/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,10 +754,10 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare
})
failpoint.Inject("MySQLSinkHangLongTime", func() { _ = util.Hang(pctx, time.Hour) })

err := s.statistics.RecordBatchExecution(func() (int, error) {
err := s.statistics.RecordBatchExecution(func() (int, int64, error) {
tx, err := s.db.BeginTx(pctx, nil)
if err != nil {
return 0, logDMLTxnErr(
return 0, 0, logDMLTxnErr(
cerror.WrapError(cerror.ErrMySQLTxnError, err),
start, s.changefeed, "BEGIN", dmls.rowCount, dmls.startTs)
}
Expand All @@ -771,12 +771,12 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare
err = s.multiStmtExecute(pctx, dmls, tx, writeTimeout)
if err != nil {
fallbackToSeqWay = true
return 0, err
return 0, 0, err
}
} else {
err = s.sequenceExecute(pctx, dmls, tx, writeTimeout)
if err != nil {
return 0, err
return 0, 0, err
}
}

Expand All @@ -794,15 +794,15 @@ func (s *mysqlBackend) execDMLWithMaxRetries(pctx context.Context, dmls *prepare
log.Warn("failed to rollback txn", zap.Error(rbErr))
}
}
return 0, err
return 0, 0, err
}

if err = tx.Commit(); err != nil {
return 0, logDMLTxnErr(
return 0, 0, logDMLTxnErr(
cerror.WrapError(cerror.ErrMySQLTxnError, err),
start, s.changefeed, "COMMIT", dmls.rowCount, dmls.startTs)
}
return dmls.rowCount, nil
return dmls.rowCount, dmls.approximateSize, nil
})
if err != nil {
return errors.Trace(err)
Expand Down
10 changes: 10 additions & 0 deletions cdc/sink/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ var (
Buckets: prometheus.ExponentialBuckets(1, 2, 18),
}, []string{"namespace", "changefeed", "type"}) // type is for `sinkType`

// ExecWriteBytesGauge records the total number of bytes written by sink.
TotalWriteBytesCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "write_bytes_total",
Help: "Total number of bytes written by sink",
}, []string{"namespace", "changefeed", "type"}) // type is for `sinkType`

// LargeRowSizeHistogram records size of large rows.
LargeRowSizeHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand Down Expand Up @@ -69,6 +78,7 @@ var (
// InitMetrics registers all metrics in this file.
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(ExecBatchHistogram)
registry.MustRegister(TotalWriteBytesCounter)
registry.MustRegister(ExecDDLHistogram)
registry.MustRegister(LargeRowSizeHistogram)
registry.MustRegister(ExecutionErrorCounter)
Expand Down
8 changes: 6 additions & 2 deletions cdc/sink/metrics/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func NewStatistics(ctx context.Context, sinkType sink.Type) *Statistics {
s := sinkType.String()
statistics.metricExecDDLHis = ExecDDLHistogram.WithLabelValues(namespcae, changefeedID, s)
statistics.metricExecBatchHis = ExecBatchHistogram.WithLabelValues(namespcae, changefeedID, s)
statistics.metricTotalWriteBytesCnt = TotalWriteBytesCounter.WithLabelValues(namespcae, changefeedID, s)
statistics.metricRowSizeHis = LargeRowSizeHistogram.WithLabelValues(namespcae, changefeedID, s)
statistics.metricExecErrCnt = ExecutionErrorCounter.WithLabelValues(namespcae, changefeedID, s)
return statistics
Expand All @@ -52,6 +53,8 @@ type Statistics struct {
metricExecDDLHis prometheus.Observer
// Histogram for DML batch size.
metricExecBatchHis prometheus.Observer
// Counter for total bytes of DML.
metricTotalWriteBytesCnt prometheus.Counter
// Histogram for Row size.
metricRowSizeHis prometheus.Observer
// Counter for sink error.
Expand All @@ -70,13 +73,14 @@ func (b *Statistics) ObserveRows(rows ...*model.RowChangedEvent) {
}

// RecordBatchExecution stats batch executors which return (batchRowCount, error).
func (b *Statistics) RecordBatchExecution(executor func() (int, error)) error {
batchSize, err := executor()
func (b *Statistics) RecordBatchExecution(executor func() (int, int64, error)) error {
batchSize, batchWriteBytes, err := executor()
if err != nil {
b.metricExecErrCnt.Inc()
return err
}
b.metricExecBatchHis.Observe(float64(batchSize))
b.metricTotalWriteBytesCnt.Add(float64(batchWriteBytes))
return nil
}

Expand Down
Loading