Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
scwhittle committed Sep 24, 2024
1 parent 77c969b commit 723190a
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 15 deletions.
6 changes: 3 additions & 3 deletions sdks/go/pkg/beam/transforms/periodic/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func NewSequenceDefinition(start, end time.Time, interval time.Duration) Sequenc
}
}

func CalculateByteSizeOfSequence(now time.Time, sd SequenceDefinition, rest offsetrange.Restriction) int64 {
// Find the # of outputs expected for overlap of and [-inf, now)
// Calculates size of the output that the sequence should have emitted up to now.
func calculateSequenceByteSize(now time.Time, sd SequenceDefinition, rest offsetrange.Restriction) int64 {
nowIndex := int64(now.Sub(mtime.Time(sd.Start).ToTime()) / sd.Interval)
if nowIndex < rest.Start {
return 0
Expand All @@ -85,7 +85,7 @@ func (fn *sequenceGenDoFn) CreateTracker(rest offsetrange.Restriction) *sdf.Lock
}

func (fn *sequenceGenDoFn) RestrictionSize(sd SequenceDefinition, rest offsetrange.Restriction) float64 {
return float64(CalculateByteSizeOfSequence(time.Now(), sd, rest))
return float64(calculateSequenceByteSize(time.Now(), sd, rest))
}

func (fn *sequenceGenDoFn) SplitRestriction(_ SequenceDefinition, rest offsetrange.Restriction) []offsetrange.Restriction {
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/transforms/periodic/periodic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestSize(t *testing.T) {
}

for _, test := range sizeTests {
got := CalculateByteSizeOfSequence(
got := calculateSequenceByteSize(
time.Unix(test.now, 0),
sd,
offsetrange.Restriction{
Expand Down
10 changes: 7 additions & 3 deletions sdks/python/apache_beam/transforms/periodicsequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,17 @@ def create_tracker(self, restriction):
return OffsetRestrictionTracker(restriction)

def restriction_size(self, element, restriction):
return sequence_backlog_bytes(element, time.time(), restriction)
return _sequence_backlog_bytes(element, time.time(), restriction)

# On drain, immediately stop emitting new elements
def truncate(self, unused_element, unused_restriction):
return None


def sequence_backlog_bytes(element, now, offset_range):
# Find the # of outputs expected for overlap of and [-inf, now)
def _sequence_backlog_bytes(element, now, offset_range):
'''
Calculates size of the output that the sequence should have emitted up to now.
'''
start, _, interval = element
if isinstance(start, Timestamp):
start = start.micros / 1000000
Expand All @@ -66,6 +68,8 @@ def sequence_backlog_bytes(element, now, offset_range):
now_index = math.floor((now - start) / interval)
if now_index < offset_range.start:
return 0
# We attempt to be precise as some runners scale based upon bytes and
# output byte throughput.
return 8 * (min(offset_range.stop, now_index) - offset_range.start)


Expand Down
16 changes: 8 additions & 8 deletions sdks/python/apache_beam/transforms/periodicsequence_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from apache_beam.testing.util import equal_to
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.transforms.periodicsequence import PeriodicSequence
from apache_beam.transforms.periodicsequence import sequence_backlog_bytes
from apache_beam.transforms.periodicsequence import _sequence_backlog_bytes

# Disable frequent lint warning due to pipe operator for chaining transforms.
# pylint: disable=expression-not-assigned
Expand Down Expand Up @@ -117,20 +117,20 @@ def test_periodicsequence_outputs_valid_sequence_in_past(self):
def test_periodicsequence_output_size(self):
element = [0, 1000000000, 10]
self.assertEqual(
sequence_backlog_bytes(element, 100, OffsetRange(10, 100000000)), 0)
_sequence_backlog_bytes(element, 100, OffsetRange(10, 100000000)), 0)
self.assertEqual(
sequence_backlog_bytes(element, 100, OffsetRange(9, 100000000)), 8)
_sequence_backlog_bytes(element, 100, OffsetRange(9, 100000000)), 8)
self.assertEqual(
sequence_backlog_bytes(element, 100, OffsetRange(8, 100000000)), 16)
_sequence_backlog_bytes(element, 100, OffsetRange(8, 100000000)), 16)
self.assertEqual(
sequence_backlog_bytes(element, 101, OffsetRange(9, 100000000)), 8)
_sequence_backlog_bytes(element, 101, OffsetRange(9, 100000000)), 8)
self.assertEqual(
sequence_backlog_bytes(element, 10000, OffsetRange(0, 100000000)),
_sequence_backlog_bytes(element, 10000, OffsetRange(0, 100000000)),
8 * 10000 / 10)
self.assertEqual(
sequence_backlog_bytes(element, 10000, OffsetRange(1002, 1003)), 0)
_sequence_backlog_bytes(element, 10000, OffsetRange(1002, 1003)), 0)
self.assertEqual(
sequence_backlog_bytes(element, 10100, OffsetRange(1002, 1003)), 8)
_sequence_backlog_bytes(element, 10100, OffsetRange(1002, 1003)), 8)


if __name__ == '__main__':
Expand Down

0 comments on commit 723190a

Please sign in to comment.