From 0d58d23f2d9c34ae5f068acddc622fa3ac6b0854 Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Fri, 12 Jan 2024 11:13:11 -0800 Subject: [PATCH] [#29180][prism] Return total element count to progress loop. Split less aggressively. (#29968) * [prism] Return total element count to progress loop. Split less aggressively. * Update comments. --------- Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com> --- .../runners/prism/internal/jobservices/job.go | 2 +- .../prism/internal/jobservices/metrics.go | 18 ++++++++++++------ .../pkg/beam/runners/prism/internal/stage.go | 15 ++++++++++----- 3 files changed, 23 insertions(+), 12 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go index d6e906bee59f..bb5eb88c9193 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go @@ -100,7 +100,7 @@ func (j *Job) PipelineOptions() *structpb.Struct { } // ContributeTentativeMetrics returns the datachannel read index, and any unknown monitoring short ids. -func (j *Job) ContributeTentativeMetrics(payloads *fnpb.ProcessBundleProgressResponse) (int64, []string) { +func (j *Job) ContributeTentativeMetrics(payloads *fnpb.ProcessBundleProgressResponse) (map[string]int64, []string) { return j.metrics.ContributeTentativeMetrics(payloads) } diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go index e0caec55881e..f90efdfa8bd9 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go @@ -480,8 +480,8 @@ func (m *metricsStore) AddShortIDs(resp *fnpb.MonitoringInfosMetadataResponse) { } } -func (m *metricsStore) contributeMetrics(d durability, mdata map[string][]byte) (int64, []string) { - readIndex := int64(-1) +func (m *metricsStore) contributeMetrics(d durability, mdata map[string][]byte) (map[string]int64, []string) { + var index, totalCount int64 if m.accums[d] == nil { m.accums[d] = map[metricKey]metricAccumulator{} } @@ -510,14 +510,20 @@ func (m *metricsStore) contributeMetrics(d durability, mdata map[string][]byte) panic(fmt.Sprintf("error decoding metrics %v: %+v\n\t%+v", key.Urn(), key, a)) } accums[key] = a - if key.Urn() == "beam:metric:data_channel:read_index:v1" { - readIndex = a.(*sumInt64).sum + switch u := key.Urn(); u { + case "beam:metric:data_channel:read_index:v1": + index = a.(*sumInt64).sum // There should only be one of these per progress response. + case "beam:metric:element_count:v1": + totalCount += a.(*sumInt64).sum } } - return readIndex, missingShortIDs + return map[string]int64{ + "index": index, + "totalCount": totalCount, + }, missingShortIDs } -func (m *metricsStore) ContributeTentativeMetrics(payloads *fnpb.ProcessBundleProgressResponse) (int64, []string) { +func (m *metricsStore) ContributeTentativeMetrics(payloads *fnpb.ProcessBundleProgressResponse) (map[string]int64, []string) { m.mu.Lock() defer m.mu.Unlock() return m.contributeMetrics(tentative, payloads.GetMonitoringData()) diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index d677a0cd4cfe..e52031b43d1e 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -130,7 +130,9 @@ func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, c // Progress + split loop. previousIndex := int64(-2) - var splitsDone bool + previousTotalCount := int64(-2) // Total count of all pcollection elements. + + unsplit := true progTick := time.NewTicker(100 * time.Millisecond) defer progTick.Stop() var dataFinished, bundleFinished bool @@ -170,8 +172,10 @@ progress: j.AddMetricShortIDs(md) } slog.Debug("progress report", "bundle", rb, "index", index, "prevIndex", previousIndex) - // Progress for the bundle hasn't advanced. Try splitting. - if previousIndex == index && !splitsDone { + + // Check if there has been any measurable progress by the input, or all output pcollections since last report. + slow := previousIndex == index["index"] && previousTotalCount == index["totalCount"] + if slow && unsplit { slog.Debug("splitting report", "bundle", rb, "index", index) sr, err := b.Split(ctx, wk, 0.5 /* fraction of remainder */, nil /* allowed splits */) if err != nil { @@ -180,7 +184,7 @@ progress: } if sr.GetChannelSplits() == nil { slog.Debug("SDK returned no splits", "bundle", rb) - splitsDone = true + unsplit = false continue progress } // TODO sort out rescheduling primary Roots on bundle failure. @@ -206,7 +210,8 @@ progress: em.ReturnResiduals(rb, int(fr), s.inputInfo, residualData) } } else { - previousIndex = index + previousIndex = index["index"] + previousTotalCount = index["totalCount"] } } }