Skip to content

Commit

Permalink
feat(restore): update metrics for native restore
Browse files Browse the repository at this point in the history
Fixes #4142
  • Loading branch information
Michal-Leszczynski committed Jan 30, 2025
1 parent 9f8417f commit a572b27
Showing 1 changed file with 43 additions and 5 deletions.
48 changes: 43 additions & 5 deletions pkg/service/restore/worker_scylla_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ import (
"time"

"github.com/pkg/errors"
"github.com/scylladb/scylla-manager/v3/pkg/metrics"
"github.com/scylladb/scylla-manager/v3/pkg/scheduler"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/sstable"
"github.com/scylladb/scylla-manager/v3/pkg/util/timeutc"
"github.com/scylladb/scylla-manager/v3/swagger/gen/scylla/v1/models"
)

Expand Down Expand Up @@ -40,7 +43,16 @@ func (w *worker) useScyllaRestoreAPI(ctx context.Context, b batch, host string)
return nc.SupportsScyllaBackupRestoreAPI()
}

func (w *tablesWorker) scyllaRestore(ctx context.Context, host string, b batch) error {
func (w *tablesWorker) scyllaRestore(ctx context.Context, host string, b batch) (err error) {
w.metrics.SetRestoreState(w.run.ClusterID, b.Location, w.run.SnapshotTag, host, metrics.RestoreStateNativeRestore)
defer func() {
if err != nil && scheduler.IsTaskInterrupted(ctx) {
w.metrics.SetRestoreState(w.run.ClusterID, b.Location, w.run.SnapshotTag, host, metrics.RestoreStateError)
} else {
w.metrics.SetRestoreState(w.run.ClusterID, b.Location, w.run.SnapshotTag, host, metrics.RestoreStateIdle)
}
}()

// RemoteSSTableDir has "<provider>:<bucket>/<path>" format
prefix, ok := strings.CutPrefix(b.RemoteSSTableDir, b.Location.StringWithoutDC())
if !ok {
Expand Down Expand Up @@ -105,14 +117,40 @@ func (w *tablesWorker) scyllaWaitTask(ctx context.Context, pr *RunProgress, b ba
}
}

func (w *worker) scyllaUpdateProgress(ctx context.Context, pr *RunProgress, b batch, task *models.TaskStatus) {
func (w *tablesWorker) scyllaUpdateProgress(ctx context.Context, pr *RunProgress, b batch, task *models.TaskStatus) {
now := timeutc.Now()
restored := b.Size * int64(task.ProgressCompleted/task.ProgressTotal)
restoredDiff := restored - pr.Restored
var startedAt, completedAt *time.Time
if t := time.Time(task.StartTime); !t.IsZero() {
pr.RestoreStartedAt = &t
startedAt = &t
}
if t := time.Time(task.EndTime); !t.IsZero() {
pr.RestoreCompletedAt = &t
completedAt = &t
}

// Update metrics boilerplate
w.metrics.IncreaseRestoredBytes(w.run.ClusterID, pr.Host, restoredDiff)
w.metrics.IncreaseRestoreDuration(w.run.ClusterID, pr.Host, timeSub(startedAt, completedAt, now))
w.metrics.DecreaseRemainingBytes(metrics.RestoreBytesLabels{
ClusterID: b.ClusterID.String(),
SnapshotTag: b.SnapshotTag,
Location: b.Location.String(),
DC: b.DC,
Node: b.NodeID,
Keyspace: b.Keyspace,
Table: b.Table,
}, restoredDiff)
w.progress.Update(restoredDiff)
w.metrics.SetProgress(metrics.RestoreProgressLabels{
ClusterID: w.run.ClusterID.String(),
SnapshotTag: w.run.SnapshotTag,
}, w.progress.CurrentProgress())

// Update run progress
pr.RestoreStartedAt = startedAt
pr.RestoreCompletedAt = completedAt
pr.Error = task.Error
pr.Restored = b.Size * int64(task.ProgressCompleted/task.ProgressTotal)
pr.Restored = restored
w.insertRunProgress(ctx, pr)
}

0 comments on commit a572b27

Please sign in to comment.