Skip to content

Commit

Permalink
[#29180][prism] Return total element count to progress loop. Split le…
Browse files Browse the repository at this point in the history
…ss aggressively. (#29968)

* [prism] Return total element count to progress loop. Split less aggressively.

* Update comments.

---------

Co-authored-by: lostluck <[email protected]>
  • Loading branch information
lostluck and lostluck authored Jan 12, 2024
1 parent 7cb559f commit 0d58d23
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 12 deletions.
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (j *Job) PipelineOptions() *structpb.Struct {
}

// ContributeTentativeMetrics returns the datachannel read index, and any unknown monitoring short ids.
func (j *Job) ContributeTentativeMetrics(payloads *fnpb.ProcessBundleProgressResponse) (int64, []string) {
func (j *Job) ContributeTentativeMetrics(payloads *fnpb.ProcessBundleProgressResponse) (map[string]int64, []string) {
return j.metrics.ContributeTentativeMetrics(payloads)
}

Expand Down
18 changes: 12 additions & 6 deletions sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,8 @@ func (m *metricsStore) AddShortIDs(resp *fnpb.MonitoringInfosMetadataResponse) {
}
}

func (m *metricsStore) contributeMetrics(d durability, mdata map[string][]byte) (int64, []string) {
readIndex := int64(-1)
func (m *metricsStore) contributeMetrics(d durability, mdata map[string][]byte) (map[string]int64, []string) {
var index, totalCount int64
if m.accums[d] == nil {
m.accums[d] = map[metricKey]metricAccumulator{}
}
Expand Down Expand Up @@ -510,14 +510,20 @@ func (m *metricsStore) contributeMetrics(d durability, mdata map[string][]byte)
panic(fmt.Sprintf("error decoding metrics %v: %+v\n\t%+v", key.Urn(), key, a))
}
accums[key] = a
if key.Urn() == "beam:metric:data_channel:read_index:v1" {
readIndex = a.(*sumInt64).sum
switch u := key.Urn(); u {
case "beam:metric:data_channel:read_index:v1":
index = a.(*sumInt64).sum // There should only be one of these per progress response.
case "beam:metric:element_count:v1":
totalCount += a.(*sumInt64).sum
}
}
return readIndex, missingShortIDs
return map[string]int64{
"index": index,
"totalCount": totalCount,
}, missingShortIDs
}

func (m *metricsStore) ContributeTentativeMetrics(payloads *fnpb.ProcessBundleProgressResponse) (int64, []string) {
func (m *metricsStore) ContributeTentativeMetrics(payloads *fnpb.ProcessBundleProgressResponse) (map[string]int64, []string) {
m.mu.Lock()
defer m.mu.Unlock()
return m.contributeMetrics(tentative, payloads.GetMonitoringData())
Expand Down
15 changes: 10 additions & 5 deletions sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, c

// Progress + split loop.
previousIndex := int64(-2)
var splitsDone bool
previousTotalCount := int64(-2) // Total count of all pcollection elements.

unsplit := true
progTick := time.NewTicker(100 * time.Millisecond)
defer progTick.Stop()
var dataFinished, bundleFinished bool
Expand Down Expand Up @@ -170,8 +172,10 @@ progress:
j.AddMetricShortIDs(md)
}
slog.Debug("progress report", "bundle", rb, "index", index, "prevIndex", previousIndex)
// Progress for the bundle hasn't advanced. Try splitting.
if previousIndex == index && !splitsDone {

// Check if there has been any measurable progress by the input, or all output pcollections since last report.
slow := previousIndex == index["index"] && previousTotalCount == index["totalCount"]
if slow && unsplit {
slog.Debug("splitting report", "bundle", rb, "index", index)
sr, err := b.Split(ctx, wk, 0.5 /* fraction of remainder */, nil /* allowed splits */)
if err != nil {
Expand All @@ -180,7 +184,7 @@ progress:
}
if sr.GetChannelSplits() == nil {
slog.Debug("SDK returned no splits", "bundle", rb)
splitsDone = true
unsplit = false
continue progress
}
// TODO sort out rescheduling primary Roots on bundle failure.
Expand All @@ -206,7 +210,8 @@ progress:
em.ReturnResiduals(rb, int(fr), s.inputInfo, residualData)
}
} else {
previousIndex = index
previousIndex = index["index"]
previousTotalCount = index["totalCount"]
}
}
}
Expand Down

0 comments on commit 0d58d23

Please sign in to comment.