Skip to content

Commit

Permalink
Merge pull request #4414 from butonic/upload-session-metrics
Browse files Browse the repository at this point in the history
upload session metrics
  • Loading branch information
butonic authored Dec 18, 2023
2 parents 8965ee0 + 074604d commit 6a3a91e
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 0 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/upload-session-metrics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: track more upload session metrics

We added a gauge for the number of uploads currently in postprocessing as well as counters for different postprocessing outcomes.

https://github.com/cs3org/reva/pull/4414
40 changes: 40 additions & 0 deletions pkg/rhttp/datatx/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,44 @@ var (
Name: "reva_upload_active",
Help: "Number of active uploads",
})
// UploadProcessing is the number of uploads in processing
UploadProcessing = promauto.NewGauge(prometheus.GaugeOpts{
Name: "reva_upload_processing",
Help: "Number of uploads in processing",
})
// UploadSessionsInitiated is the number of upload sessions that have been initiated
UploadSessionsInitiated = promauto.NewCounter(prometheus.CounterOpts{
Name: "reva_upload_sessions_initiated",
Help: "Number of uploads sessions that were initiated",
})
// UploadSessionsBytesReceived is the number of upload sessions that have received all bytes
UploadSessionsBytesReceived = promauto.NewCounter(prometheus.CounterOpts{
Name: "reva_upload_sessions_bytes_received",
Help: "Number of uploads sessions that have received all bytes",
})
// UploadSessionsFinalized is the number of upload sessions that have received all bytes
UploadSessionsFinalized = promauto.NewCounter(prometheus.CounterOpts{
Name: "reva_upload_sessions_finalized",
Help: "Number of uploads sessions that have successfully completed",
})
// UploadSessionsAborted is the number of upload sessions that have been aborted
UploadSessionsAborted = promauto.NewCounter(prometheus.CounterOpts{
Name: "reva_upload_sessions_aborted",
Help: "Number of uploads sessions that have aborted by postprocessing",
})
// UploadSessionsDeleted is the number of upload sessions that have been deleted
UploadSessionsDeleted = promauto.NewCounter(prometheus.CounterOpts{
Name: "reva_upload_sessions_deleted",
Help: "Number of uploads sessions that have been deleted by postprocessing",
})
// UploadSessionsRestarted is the number of upload sessions that have been restarted
UploadSessionsRestarted = promauto.NewCounter(prometheus.CounterOpts{
Name: "reva_upload_sessions_restarted",
Help: "Number of uploads sessions that have been restarted by postprocessing",
})
// UploadSessionsScanned is the number of upload sessions that have been scanned by antivirus
UploadSessionsScanned = promauto.NewCounter(prometheus.CounterOpts{
Name: "reva_upload_sessions_scanned",
Help: "Number of uploads sessions that have been scanned by antivirus",
})
)
10 changes: 10 additions & 0 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/logger"
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/utils/download"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/cache"
Expand Down Expand Up @@ -268,14 +269,18 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
case events.PPOutcomeAbort:
failed = true
keepUpload = true
metrics.UploadSessionsAborted.Inc()
case events.PPOutcomeContinue:
if err := up.Finalize(); err != nil {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not finalize upload")
keepUpload = true // should we keep the upload when assembling failed?
failed = true
} else {
metrics.UploadSessionsFinalized.Inc()
}
case events.PPOutcomeDelete:
failed = true
metrics.UploadSessionsDeleted.Inc()
}

getParent := func() *node.Node {
Expand Down Expand Up @@ -344,6 +349,9 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not create url")
continue
}

metrics.UploadSessionsRestarted.Inc()

// restart postprocessing
if err := events.Publish(ctx, fs.stream, events.BytesReceived{
UploadID: up.Info.ID,
Expand Down Expand Up @@ -471,6 +479,8 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
continue
}

metrics.UploadSessionsScanned.Inc()

// remove cache entry in gateway
fs.cache.RemoveStatContext(ctx, ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID})
default:
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/utils/decomposedfs/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/mime"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics"
"github.com/cs3org/reva/v2/pkg/storage/utils/ace"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
Expand Down Expand Up @@ -1220,6 +1221,9 @@ func (n *Node) FindStorageSpaceRoot(ctx context.Context) error {

// UnmarkProcessing removes the processing flag from the node
func (n *Node) UnmarkProcessing(ctx context.Context, uploadID string) error {
// we currently have to decrease the counter for every processing run to match the incrases
metrics.UploadProcessing.Sub(1)

v, _ := n.XattrString(ctx, prefixes.StatusPrefix)
if v != ProcessingStatus+uploadID {
// file started another postprocessing later - do not remove
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/utils/decomposedfs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cs3org/reva/v2/pkg/appctx"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics"
"github.com/cs3org/reva/v2/pkg/storage"
"github.com/cs3org/reva/v2/pkg/storage/utils/chunking"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
Expand Down Expand Up @@ -215,6 +216,8 @@ func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Refere

info, _ = upload.GetInfo(ctx)

metrics.UploadSessionsInitiated.Inc()

return map[string]string{
"simple": info.ID,
"tus": info.ID,
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/utils/decomposedfs/upload/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/events"
"github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
Expand Down Expand Up @@ -265,6 +266,11 @@ func (upload *Upload) FinishUpload(_ context.Context) error {
return err
}

// increase the processing counter for every started processing
// will be decreased in Cleanup()
metrics.UploadProcessing.Inc()
metrics.UploadSessionsBytesReceived.Inc()

upload.Node = n

if upload.pub != nil {
Expand Down Expand Up @@ -295,6 +301,7 @@ func (upload *Upload) FinishUpload(_ context.Context) error {
log.Error().Err(err).Msg("failed to upload")
return err
}
metrics.UploadSessionsFinalized.Inc()
}

return upload.tp.Propagate(upload.Ctx, n, upload.SizeDiff)
Expand Down

0 comments on commit 6a3a91e

Please sign in to comment.