Skip to content

Commit

Permalink
chore(deps): add telemetry instrumentation to GC
Browse files Browse the repository at this point in the history
  • Loading branch information
sergeyberezansky committed Jul 29, 2024
1 parent 973bed5 commit f70ba39
Showing 1 changed file with 28 additions and 1 deletion.
29 changes: 28 additions & 1 deletion pkg/wekafs/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"github.com/rs/zerolog/log"
"github.com/wekafs/csi-wekafs/pkg/wekafs/apiclient"
"go.opentelemetry.io/otel"
"io"
"os"
"path/filepath"
Expand All @@ -28,6 +29,9 @@ func initInnerPathVolumeGc(mounter *wekaMounter) *innerPathVolGc {
}

func (gc *innerPathVolGc) triggerGc(ctx context.Context, fs string, apiClient *apiclient.ApiClient) {
op := "triggerGc"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
gc.Lock()
defer gc.Unlock()
if gc.isRunning[fs] {
Expand All @@ -39,6 +43,10 @@ func (gc *innerPathVolGc) triggerGc(ctx context.Context, fs string, apiClient *a
}

func (gc *innerPathVolGc) triggerGcVolume(ctx context.Context, volume *Volume) {
op := "triggerGcVolume"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
logger := log.Ctx(ctx).With().Str("volume_id", volume.GetId()).Logger()
logger.Info().Msg("Triggering garbage collection of volume")
fsName := volume.FilesystemName
Expand All @@ -52,6 +60,10 @@ func (gc *innerPathVolGc) triggerGcVolume(ctx context.Context, volume *Volume) {
}

func (gc *innerPathVolGc) purgeVolume(ctx context.Context, volume *Volume) {
op := "purgeVolume"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
logger := log.Ctx(ctx).With().Str("volume_id", volume.GetId()).Logger()
logger.Debug().Msg("Starting garbage collection of volume")
fsName := volume.FilesystemName
Expand Down Expand Up @@ -86,6 +98,10 @@ func (gc *innerPathVolGc) purgeVolume(ctx context.Context, volume *Volume) {
}

func purgeDirectory(ctx context.Context, path string, maxThreads int) error {
op := "purgeDirectory"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
logger := log.Ctx(ctx).With().Str("path", path).Logger()
if !PathExists(path) {
logger.Error().Str("path", path).Msg("Failed to remove existing directory")
Expand Down Expand Up @@ -124,7 +140,12 @@ func purgeDirectory(ctx context.Context, path string, maxThreads int) error {
}

func purgeDirectoryWorker(ctx context.Context, path string, paths chan<- string) error {
op := "purgeDirectoryWorker"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
logger := log.Ctx(ctx).With().Str("path", path).Logger()

logger.Trace().Msg("Starting GC worker thread")
defer logger.Trace().Msg("Finishing GC worker thread")
for !pathIsEmptyDir(path) {
Expand All @@ -138,13 +159,19 @@ func purgeDirectoryWorker(ctx context.Context, path string, paths chan<- string)
if f.IsDir() {
paths <- fp
} else if err := os.Remove(fp); err != nil {
logger.Error().Err(err).Msg("Failed to remove file")
if !os.IsNotExist(err) {
logger.Error().Err(err).Msg("Failed to remove file")
}
}
}
}
return os.Remove(path)
}
func (gc *innerPathVolGc) purgeLeftovers(ctx context.Context, fs string, apiClient *apiclient.ApiClient) {
op := "purgeLeftovers"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
logger := log.Ctx(ctx)
defer gc.finishGcCycle(ctx, fs, apiClient)
path, err, unmount := gc.mounter.Mount(ctx, fs, apiClient)
Expand Down

0 comments on commit f70ba39

Please sign in to comment.