From 074604d614f21af19ca69ba3162b3acf9647dda9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Fri, 15 Dec 2023 16:40:26 +0100 Subject: [PATCH] upload session metrics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- .../unreleased/upload-session-metrics.md | 5 +++ pkg/rhttp/datatx/metrics/metrics.go | 40 +++++++++++++++++++ .../utils/decomposedfs/decomposedfs.go | 10 +++++ pkg/storage/utils/decomposedfs/node/node.go | 4 ++ pkg/storage/utils/decomposedfs/upload.go | 3 ++ .../utils/decomposedfs/upload/upload.go | 7 ++++ 6 files changed, 69 insertions(+) create mode 100644 changelog/unreleased/upload-session-metrics.md diff --git a/changelog/unreleased/upload-session-metrics.md b/changelog/unreleased/upload-session-metrics.md new file mode 100644 index 0000000000..6041732640 --- /dev/null +++ b/changelog/unreleased/upload-session-metrics.md @@ -0,0 +1,5 @@ +Enhancement: track more upload session metrics + +We added a gauge for the number of uploads currently in postprocessing as well as counters for different postprocessing outcomes. + +https://github.com/cs3org/reva/pull/4414 diff --git a/pkg/rhttp/datatx/metrics/metrics.go b/pkg/rhttp/datatx/metrics/metrics.go index f090a04c33..80ca36c2c1 100644 --- a/pkg/rhttp/datatx/metrics/metrics.go +++ b/pkg/rhttp/datatx/metrics/metrics.go @@ -17,4 +17,44 @@ var ( Name: "reva_upload_active", Help: "Number of active uploads", }) + // UploadProcessing is the number of uploads in processing + UploadProcessing = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "reva_upload_processing", + Help: "Number of uploads in processing", + }) + // UploadSessionsInitiated is the number of upload sessions that have been initiated + UploadSessionsInitiated = promauto.NewCounter(prometheus.CounterOpts{ + Name: "reva_upload_sessions_initiated", + Help: "Number of uploads sessions that were initiated", + }) + // UploadSessionsBytesReceived is the number of upload sessions that have received all bytes + UploadSessionsBytesReceived = promauto.NewCounter(prometheus.CounterOpts{ + Name: "reva_upload_sessions_bytes_received", + Help: "Number of uploads sessions that have received all bytes", + }) + // UploadSessionsFinalized is the number of upload sessions that have received all bytes + UploadSessionsFinalized = promauto.NewCounter(prometheus.CounterOpts{ + Name: "reva_upload_sessions_finalized", + Help: "Number of uploads sessions that have successfully completed", + }) + // UploadSessionsAborted is the number of upload sessions that have been aborted + UploadSessionsAborted = promauto.NewCounter(prometheus.CounterOpts{ + Name: "reva_upload_sessions_aborted", + Help: "Number of uploads sessions that have aborted by postprocessing", + }) + // UploadSessionsDeleted is the number of upload sessions that have been deleted + UploadSessionsDeleted = promauto.NewCounter(prometheus.CounterOpts{ + Name: "reva_upload_sessions_deleted", + Help: "Number of uploads sessions that have been deleted by postprocessing", + }) + // UploadSessionsRestarted is the number of upload sessions that have been restarted + UploadSessionsRestarted = promauto.NewCounter(prometheus.CounterOpts{ + Name: "reva_upload_sessions_restarted", + Help: "Number of uploads sessions that have been restarted by postprocessing", + }) + // UploadSessionsScanned is the number of upload sessions that have been scanned by antivirus + UploadSessionsScanned = promauto.NewCounter(prometheus.CounterOpts{ + Name: "reva_upload_sessions_scanned", + Help: "Number of uploads sessions that have been scanned by antivirus", + }) ) diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index 60662ee814..23384d5536 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -42,6 +42,7 @@ import ( "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/logger" "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" + "github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics" "github.com/cs3org/reva/v2/pkg/rhttp/datatx/utils/download" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/cache" @@ -268,14 +269,18 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { case events.PPOutcomeAbort: failed = true keepUpload = true + metrics.UploadSessionsAborted.Inc() case events.PPOutcomeContinue: if err := up.Finalize(); err != nil { log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not finalize upload") keepUpload = true // should we keep the upload when assembling failed? failed = true + } else { + metrics.UploadSessionsFinalized.Inc() } case events.PPOutcomeDelete: failed = true + metrics.UploadSessionsDeleted.Inc() } getParent := func() *node.Node { @@ -344,6 +349,9 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not create url") continue } + + metrics.UploadSessionsRestarted.Inc() + // restart postprocessing if err := events.Publish(ctx, fs.stream, events.BytesReceived{ UploadID: up.Info.ID, @@ -471,6 +479,8 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { continue } + metrics.UploadSessionsScanned.Inc() + // remove cache entry in gateway fs.cache.RemoveStatContext(ctx, ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) default: diff --git a/pkg/storage/utils/decomposedfs/node/node.go b/pkg/storage/utils/decomposedfs/node/node.go index 3eb29ea4d5..a5f9c2b1df 100644 --- a/pkg/storage/utils/decomposedfs/node/node.go +++ b/pkg/storage/utils/decomposedfs/node/node.go @@ -41,6 +41,7 @@ import ( ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/mime" + "github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics" "github.com/cs3org/reva/v2/pkg/storage/utils/ace" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes" @@ -1220,6 +1221,9 @@ func (n *Node) FindStorageSpaceRoot(ctx context.Context) error { // UnmarkProcessing removes the processing flag from the node func (n *Node) UnmarkProcessing(ctx context.Context, uploadID string) error { + // we currently have to decrease the counter for every processing run to match the incrases + metrics.UploadProcessing.Sub(1) + v, _ := n.XattrString(ctx, prefixes.StatusPrefix) if v != ProcessingStatus+uploadID { // file started another postprocessing later - do not remove diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index 59ef12a4e1..64fd575d14 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -34,6 +34,7 @@ import ( "github.com/cs3org/reva/v2/pkg/appctx" ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" "github.com/cs3org/reva/v2/pkg/errtypes" + "github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/utils/chunking" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" @@ -215,6 +216,8 @@ func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Refere info, _ = upload.GetInfo(ctx) + metrics.UploadSessionsInitiated.Inc() + return map[string]string{ "simple": info.ID, "tus": info.ID, diff --git a/pkg/storage/utils/decomposedfs/upload/upload.go b/pkg/storage/utils/decomposedfs/upload/upload.go index c3e0446054..cb00376a5e 100644 --- a/pkg/storage/utils/decomposedfs/upload/upload.go +++ b/pkg/storage/utils/decomposedfs/upload/upload.go @@ -39,6 +39,7 @@ import ( ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" "github.com/cs3org/reva/v2/pkg/errtypes" "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/rhttp/datatx/metrics" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" @@ -265,6 +266,11 @@ func (upload *Upload) FinishUpload(_ context.Context) error { return err } + // increase the processing counter for every started processing + // will be decreased in Cleanup() + metrics.UploadProcessing.Inc() + metrics.UploadSessionsBytesReceived.Inc() + upload.Node = n if upload.pub != nil { @@ -295,6 +301,7 @@ func (upload *Upload) FinishUpload(_ context.Context) error { log.Error().Err(err).Msg("failed to upload") return err } + metrics.UploadSessionsFinalized.Inc() } return upload.tp.Propagate(upload.Ctx, n, upload.SizeDiff)