Skip to content

Commit

Permalink
Revert "[BEAM-14294] Worker changes to support trivial Batched DoFns (#…
Browse files Browse the repository at this point in the history
…17384)" (#17694)

This reverts commit 1c4418c.
  • Loading branch information
TheNeuralBit authored May 17, 2022
1 parent 10aeab8 commit 9b8ec45
Show file tree
Hide file tree
Showing 17 changed files with 190 additions and 1,155 deletions.
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/coders/fast_coders_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
class FastCoders(unittest.TestCase):
def test_using_fast_impl(self):
try:
utils.check_compiled('apache_beam.coders.coder_impl')
utils.check_compiled('apache_beam.coders')
except RuntimeError:
self.skipTest('Cython is not installed')
# pylint: disable=wrong-import-order, wrong-import-position
Expand Down
15 changes: 1 addition & 14 deletions sdks/python/apache_beam/runners/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
cimport cython

from apache_beam.utils.windowed_value cimport WindowedValue
from apache_beam.utils.windowed_value cimport WindowedBatch
from apache_beam.transforms.cy_dataflow_distribution_counter cimport DataflowDistributionCounter

from libc.stdint cimport int64_t
Expand All @@ -29,15 +28,12 @@ cdef type TaggedOutput, TimestampedValue

cdef class Receiver(object):
cpdef receive(self, WindowedValue windowed_value)
cpdef receive_batch(self, WindowedBatch windowed_batch)
cpdef flush(self)


cdef class MethodWrapper(object):
cdef public object args
cdef public object defaults
cdef public object method_value
cdef str method_name
cdef bint has_userstate_arguments
cdef object state_args_to_replace
cdef object timer_args_to_replace
Expand All @@ -54,7 +50,6 @@ cdef class MethodWrapper(object):

cdef class DoFnSignature(object):
cdef public MethodWrapper process_method
cdef public MethodWrapper process_batch_method
cdef public MethodWrapper start_bundle_method
cdef public MethodWrapper finish_bundle_method
cdef public MethodWrapper setup_lifecycle_method
Expand All @@ -63,7 +58,6 @@ cdef class DoFnSignature(object):
cdef public MethodWrapper initial_restriction_method
cdef public MethodWrapper create_tracker_method
cdef public MethodWrapper split_method
cdef public object batching_configuration
cdef public object do_fn
cdef public object timer_methods
cdef bint _is_stateful_dofn
Expand All @@ -87,22 +81,17 @@ cdef class DoFnInvoker(object):

cdef class SimpleInvoker(DoFnInvoker):
cdef object process_method
cdef object process_batch_method


cdef class PerWindowInvoker(DoFnInvoker):
cdef list side_inputs
cdef DoFnContext context
cdef list args_for_process
cdef dict kwargs_for_process
cdef list placeholders_for_process
cdef list args_for_process_batch
cdef dict kwargs_for_process_batch
cdef list placeholders_for_process_batch
cdef list placeholders
cdef bint has_windowed_inputs
cdef bint cache_globally_windowed_args
cdef object process_method
cdef object process_batch_method
cdef bint is_splittable
cdef object threadsafe_restriction_tracker
cdef object threadsafe_watermark_estimator
Expand Down Expand Up @@ -136,8 +125,6 @@ cdef class _OutputProcessor(OutputProcessor):
cdef Receiver main_receivers
cdef object tagged_receivers
cdef DataflowDistributionCounter per_element_output_counter
cdef object output_batch_converter

@cython.locals(windowed_value=WindowedValue,
output_element_count=int64_t)
cpdef process_outputs(self, WindowedValue element, results,
Expand Down
Loading

0 comments on commit 9b8ec45

Please sign in to comment.