diff --git a/sdks/go/pkg/beam/transforms/periodic/periodic.go b/sdks/go/pkg/beam/transforms/periodic/periodic.go index 84ee50f2ae6c..cc9c342b9125 100644 --- a/sdks/go/pkg/beam/transforms/periodic/periodic.go +++ b/sdks/go/pkg/beam/transforms/periodic/periodic.go @@ -61,8 +61,8 @@ func NewSequenceDefinition(start, end time.Time, interval time.Duration) Sequenc } } -func CalculateByteSizeOfSequence(now time.Time, sd SequenceDefinition, rest offsetrange.Restriction) int64 { - // Find the # of outputs expected for overlap of and [-inf, now) +// Calculates size of the output that the sequence should have emitted up to now. +func calculateSequenceByteSize(now time.Time, sd SequenceDefinition, rest offsetrange.Restriction) int64 { nowIndex := int64(now.Sub(mtime.Time(sd.Start).ToTime()) / sd.Interval) if nowIndex < rest.Start { return 0 @@ -85,7 +85,7 @@ func (fn *sequenceGenDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.Lock } func (fn *sequenceGenDoFn) RestrictionSize(sd SequenceDefinition, rest offsetrange.Restriction) float64 { - return float64(CalculateByteSizeOfSequence(time.Now(), sd, rest)) + return float64(calculateSequenceByteSize(time.Now(), sd, rest)) } func (fn *sequenceGenDoFn) SplitRestriction(_ SequenceDefinition, rest offsetrange.Restriction) []offsetrange.Restriction { diff --git a/sdks/go/pkg/beam/transforms/periodic/periodic_test.go b/sdks/go/pkg/beam/transforms/periodic/periodic_test.go index 56dba8778684..26f94be5b4e9 100644 --- a/sdks/go/pkg/beam/transforms/periodic/periodic_test.go +++ b/sdks/go/pkg/beam/transforms/periodic/periodic_test.go @@ -79,7 +79,7 @@ func TestSize(t *testing.T) { } for _, test := range sizeTests { - got := CalculateByteSizeOfSequence( + got := calculateSequenceByteSize( time.Unix(test.now, 0), sd, offsetrange.Restriction{ diff --git a/sdks/python/apache_beam/transforms/periodicsequence.py b/sdks/python/apache_beam/transforms/periodicsequence.py index 61c9aacd920c..613661b22957 100644 --- a/sdks/python/apache_beam/transforms/periodicsequence.py +++ b/sdks/python/apache_beam/transforms/periodicsequence.py @@ -49,15 +49,17 @@ def create_tracker(self, restriction): return OffsetRestrictionTracker(restriction) def restriction_size(self, element, restriction): - return sequence_backlog_bytes(element, time.time(), restriction) + return _sequence_backlog_bytes(element, time.time(), restriction) # On drain, immediately stop emitting new elements def truncate(self, unused_element, unused_restriction): return None -def sequence_backlog_bytes(element, now, offset_range): - # Find the # of outputs expected for overlap of and [-inf, now) +def _sequence_backlog_bytes(element, now, offset_range): + ''' + Calculates size of the output that the sequence should have emitted up to now. + ''' start, _, interval = element if isinstance(start, Timestamp): start = start.micros / 1000000 @@ -66,6 +68,8 @@ def sequence_backlog_bytes(element, now, offset_range): now_index = math.floor((now - start) / interval) if now_index < offset_range.start: return 0 + # We attempt to be precise as some runners scale based upon bytes and + # output byte throughput. return 8 * (min(offset_range.stop, now_index) - offset_range.start) diff --git a/sdks/python/apache_beam/transforms/periodicsequence_test.py b/sdks/python/apache_beam/transforms/periodicsequence_test.py index 58b3c2a61b87..221520c94622 100644 --- a/sdks/python/apache_beam/transforms/periodicsequence_test.py +++ b/sdks/python/apache_beam/transforms/periodicsequence_test.py @@ -30,7 +30,7 @@ from apache_beam.testing.util import equal_to from apache_beam.transforms.periodicsequence import PeriodicImpulse from apache_beam.transforms.periodicsequence import PeriodicSequence -from apache_beam.transforms.periodicsequence import sequence_backlog_bytes +from apache_beam.transforms.periodicsequence import _sequence_backlog_bytes # Disable frequent lint warning due to pipe operator for chaining transforms. # pylint: disable=expression-not-assigned @@ -117,20 +117,20 @@ def test_periodicsequence_outputs_valid_sequence_in_past(self): def test_periodicsequence_output_size(self): element = [0, 1000000000, 10] self.assertEqual( - sequence_backlog_bytes(element, 100, OffsetRange(10, 100000000)), 0) + _sequence_backlog_bytes(element, 100, OffsetRange(10, 100000000)), 0) self.assertEqual( - sequence_backlog_bytes(element, 100, OffsetRange(9, 100000000)), 8) + _sequence_backlog_bytes(element, 100, OffsetRange(9, 100000000)), 8) self.assertEqual( - sequence_backlog_bytes(element, 100, OffsetRange(8, 100000000)), 16) + _sequence_backlog_bytes(element, 100, OffsetRange(8, 100000000)), 16) self.assertEqual( - sequence_backlog_bytes(element, 101, OffsetRange(9, 100000000)), 8) + _sequence_backlog_bytes(element, 101, OffsetRange(9, 100000000)), 8) self.assertEqual( - sequence_backlog_bytes(element, 10000, OffsetRange(0, 100000000)), + _sequence_backlog_bytes(element, 10000, OffsetRange(0, 100000000)), 8 * 10000 / 10) self.assertEqual( - sequence_backlog_bytes(element, 10000, OffsetRange(1002, 1003)), 0) + _sequence_backlog_bytes(element, 10000, OffsetRange(1002, 1003)), 0) self.assertEqual( - sequence_backlog_bytes(element, 10100, OffsetRange(1002, 1003)), 8) + _sequence_backlog_bytes(element, 10100, OffsetRange(1002, 1003)), 8) if __name__ == '__main__':