From 758344711a550c379989a218c1669a91d76c2f84 Mon Sep 17 00:00:00 2001 From: Miccah Date: Thu, 19 Oct 2023 08:46:49 -0700 Subject: [PATCH] Export ChunkError fields and add ErrorsFor convenience method (#1920) --- pkg/sources/job_progress.go | 22 ++++++++++++++++++---- pkg/sources/job_progress_test.go | 23 +++++++++++++++++++++++ 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/pkg/sources/job_progress.go b/pkg/sources/job_progress.go index 21db3b7a4119..5fffd79aba5d 100644 --- a/pkg/sources/job_progress.go +++ b/pkg/sources/job_progress.go @@ -89,14 +89,14 @@ func (f Fatal) Unwrap() error { return f.error } // ChunkError is a custom error type for errors encountered during chunking of // a specific unit. type ChunkError struct { - unit SourceUnit - err error + Unit SourceUnit + Err error } func (f ChunkError) Error() string { - return fmt.Sprintf("error chunking unit %q: %s", f.unit.SourceUnitID(), f.err.Error()) + return fmt.Sprintf("error chunking unit %q: %s", f.Unit.SourceUnitID(), f.Err.Error()) } -func (f ChunkError) Unwrap() error { return f.err } +func (f ChunkError) Unwrap() error { return f.Err } // JobProgress aggregates information about a run of a Source. type JobProgress struct { @@ -351,3 +351,17 @@ func (m JobProgressMetrics) ElapsedTime() time.Duration { } return m.EndTime.Sub(m.StartTime) } + +// ErrorsFor returns all the errors for the given SourceUnit. If there are no +// errors, including the case that the unit has not been encountered, nil will +// be returned. +func (m JobProgressMetrics) ErrorsFor(unit SourceUnit) []error { + var errs []error + for _, err := range m.Errors { + var chunkErr ChunkError + if errors.As(err, &chunkErr) && chunkErr.Unit == unit { + errs = append(errs, err) + } + } + return errs +} diff --git a/pkg/sources/job_progress_test.go b/pkg/sources/job_progress_test.go index 9376b329ed4a..47724edf3bbc 100644 --- a/pkg/sources/job_progress_test.go +++ b/pkg/sources/job_progress_test.go @@ -126,3 +126,26 @@ func TestJobProgressElapsedTime(t *testing.T) { metrics.EndTime = metrics.StartTime.Add(1 * time.Hour) assert.Equal(t, metrics.ElapsedTime(), 1*time.Hour) } + +func TestJobProgressErrorsFor(t *testing.T) { + metrics := JobProgressMetrics{ + Errors: []error{ + Fatal{ChunkError{ + Unit: CommonSourceUnit{ID: "foo"}, + Err: fmt.Errorf("foo error"), + }}, + ChunkError{ + Unit: CommonSourceUnit{ID: "foo"}, + Err: fmt.Errorf("foo again error"), + }, + ChunkError{ + Unit: CommonSourceUnit{ID: "bar"}, + Err: fmt.Errorf("bar error"), + }, + fmt.Errorf("hi there"), + }, + } + assert.Equal(t, 2, len(metrics.ErrorsFor(CommonSourceUnit{ID: "foo"}))) + assert.Equal(t, 1, len(metrics.ErrorsFor(CommonSourceUnit{ID: "bar"}))) + assert.Equal(t, 0, len(metrics.ErrorsFor(CommonSourceUnit{ID: "baz"}))) +}