Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] - S3 metrics #3577

Merged
merged 31 commits into from
Nov 26, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
merge main
ahrav committed Nov 16, 2024
commit abc25b2590c66de3d648b6fb15cf94f79058c7eb
51 changes: 41 additions & 10 deletions pkg/sources/s3/progress_tracker.go
Original file line number Diff line number Diff line change
@@ -2,7 +2,6 @@ package s3

import (
"encoding/json"
"errors"
"fmt"
"sync"

@@ -15,6 +14,33 @@ import (
// ProgressTracker maintains scan progress state for S3 bucket scanning,
// enabling resumable scans by tracking which objects have been successfully processed.
// It provides checkpoints that can be used to resume interrupted scans without missing objects.
//
// S3 buckets are organized as flat namespaces of objects identified by unique keys.
// When listing objects, S3 returns paginated results with a maximum of 1000 objects per page.
// The ListObjectsV2 API accepts a 'StartAfter' parameter that allows resuming the listing
// from a specific object key.
//
// The tracker maintains state for the current page of objects (up to 1000) using a boolean array
// to track completion status and an ordered list to record the sequence of completions.
// This enables finding the highest consecutive completed index as a "low water mark".
//
// The key of the object at this index is encoded with the current bucket into a ResumeInfo checkpoint
// and persisted in the Progress.EncodedResumeInfo field as JSON. If a scan is interrupted, it can
// resume from the last checkpoint by using that key as StartAfter.
//
// The low water mark approach ensures scan reliability by only checkpointing consecutively completed
// objects. For example, if objects 0-5 and 7-8 are complete but 6 is incomplete, only objects 0-5
// will be checkpointed. While this may result in re-scanning objects 7-8 when resuming, it guarantees
// no objects are missed in case of interruption.
//
// When scanning multiple buckets, the current bucket is tracked in the checkpoint to enable
// resuming from the correct bucket. The scan will continue from the last checkpointed object
// in that bucket.
//
// For example, if scanning is interrupted after processing 1500 objects across 2 pages:
// Page 1 (objects 0-999): Fully processed, checkpoint saved at object 999
// Page 2 (objects 1000-1999): Partially processed through 1600, but only consecutive through 1499
// On resume: StartAfter=object1499 in saved bucket, scanning continues from object 1500
type ProgressTracker struct {
enabled bool

@@ -24,6 +50,7 @@ type ProgressTracker struct {
completionOrder []int // Track the order in which objects complete

// progress holds the scan's overall progress state and enables persistence.
// The EncodedResumeInfo field stores the JSON-encoded ResumeInfo checkpoint.
progress *sources.Progress // Reference to source's Progress
}

@@ -32,22 +59,20 @@ const defaultMaxObjectsPerPage = 1000
// NewProgressTracker creates a new progress tracker for S3 scanning operations.
// The enabled parameter determines if progress tracking is active, and progress
// provides the underlying mechanism for persisting scan state.
func NewProgressTracker(_ context.Context, enabled bool, progress *sources.Progress) (*ProgressTracker, error) {
if progress == nil {
return nil, errors.New("Nil progress provided; progress is required for tracking")
}
func NewProgressTracker(ctx context.Context, enabled bool, progress *sources.Progress) *ProgressTracker {
ctx.Logger().Info("Creating progress tracker")

return &ProgressTracker{
// We are resuming if we have completed objects from a previous scan.
completedObjects: make([]bool, defaultMaxObjectsPerPage),
completionOrder: make([]int, 0, defaultMaxObjectsPerPage),
enabled: enabled,
progress: progress,
}, nil
}
}

// Reset prepares the tracker for a new page of objects by clearing the completion state.
func (p *ProgressTracker) Reset(_ context.Context) {
func (p *ProgressTracker) Reset() {
if !p.enabled {
return
}
@@ -73,9 +98,6 @@ type ResumeInfo struct {
// the minimum required data to enable resumption.
func (p *ProgressTracker) GetResumePoint(ctx context.Context) (ResumeInfo, error) {
resume := ResumeInfo{}
if p.progress == nil {
return resume, errors.New("progress is nil, progress is required for resuming")
}

if !p.enabled || p.progress.EncodedResumeInfo == "" {
return resume, nil
@@ -118,6 +140,15 @@ func (p *ProgressTracker) Complete(_ context.Context, message string) error {
// This approach ensures scan reliability by only checkpointing consecutively completed
// objects. While this may result in re-scanning some objects when resuming, it guarantees
// no objects are missed in case of interruption.
//
// For example, consider scanning a page of 10 objects where objects 0-5 and 7-8 complete
// successfully but object 6 fails:
// - Objects completed: [0,1,2,3,4,5,7,8]
// - The checkpoint will only include objects 0-5 since they are consecutive
// - If scanning is interrupted and resumed:
// - Scan resumes after object 5 (the last checkpoint)
// - Objects 7-8 will be re-scanned even though they completed before
// - This ensures object 6 is not missed
func (p *ProgressTracker) UpdateObjectProgress(
ctx context.Context,
completedIdx int,
23 changes: 6 additions & 17 deletions pkg/sources/s3/progress_tracker_test.go
Original file line number Diff line number Diff line change
@@ -18,8 +18,7 @@ func TestProgressTrackerResumption(t *testing.T) {

// First scan - process 6 objects then interrupt.
initialProgress := &sources.Progress{}
tracker, err := NewProgressTracker(ctx, true, initialProgress)
require.NoError(t, err)
tracker := NewProgressTracker(ctx, true, initialProgress)

firstPage := &s3.ListObjectsV2Output{
Contents: make([]*s3.Object, 12), // Total of 12 objects
@@ -42,8 +41,7 @@ func TestProgressTrackerResumption(t *testing.T) {
assert.Equal(t, "key-5", resumeInfo.StartAfter)

// Resume scan with existing progress.
resumeTracker, err := NewProgressTracker(ctx, true, initialProgress)
require.NoError(t, err)
resumeTracker := NewProgressTracker(ctx, true, initialProgress)

resumePage := &s3.ListObjectsV2Output{
Contents: firstPage.Contents[6:], // Remaining 6 objects
@@ -77,13 +75,12 @@ func TestProgressTrackerReset(t *testing.T) {

ctx := context.Background()
progress := new(sources.Progress)
tracker, err := NewProgressTracker(ctx, tt.enabled, progress)
require.NoError(t, err)
tracker := NewProgressTracker(ctx, tt.enabled, progress)

tracker.completedObjects[1] = true
tracker.completedObjects[2] = true

tracker.Reset(ctx)
tracker.Reset()

assert.Equal(t, defaultMaxObjectsPerPage, len(tracker.completedObjects),
"Reset changed the length of completed objects")
@@ -138,13 +135,6 @@ func TestGetResumePoint(t *testing.T) {
EncodedResumeInfo: `{"current_bucket":"","start_after":"test-key"}`,
},
},
{
name: "nil progress",
enabled: true,
progress: nil,
expectedResumeInfo: ResumeInfo{},
expectError: true,
},
{
name: "unmarshal error",
enabled: true,
@@ -323,10 +313,9 @@ func TestComplete(t *testing.T) {
EncodedResumeInfo: tt.initialState.resumeInfo,
Message: tt.initialState.message,
}
tracker, err := NewProgressTracker(ctx, tt.enabled, progress)
assert.NoError(t, err)
tracker := NewProgressTracker(ctx, tt.enabled, progress)

err = tracker.Complete(ctx, tt.completeMessage)
err := tracker.Complete(ctx, tt.completeMessage)
assert.NoError(t, err)

assert.Equal(t, tt.wantState.resumeInfo, progress.EncodedResumeInfo)
You are viewing a condensed version of this merge commit. You can view the full changes here.