Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: cloudpickle appears to incorrectly unpickle cloned combiners #26209

Closed
1 of 15 tasks
tvalentyn opened this issue Apr 10, 2023 · 8 comments
Closed
1 of 15 tasks

[Bug]: cloudpickle appears to incorrectly unpickle cloned combiners #26209

tvalentyn opened this issue Apr 10, 2023 · 8 comments
Assignees

Comments

@tvalentyn
Copy link
Contributor

tvalentyn commented Apr 10, 2023

What happened?

Combiner lifting and combiner with_fanout utility, copy portions of Beam's subgraph related to combiners. It appears that unpickling cloudpickle-pickled bytes encoding those subgraph results in multiple CombineFns sharing the same state, which results in side-effect in combiner setup and teardown initialization.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@tvalentyn
Copy link
Contributor Author

Test failure looks like the following:

...
self = <apache_beam.transforms.combinefn_lifecycle_test.LocalCombineFnLifecycleTest_0 testMethod=test_combine>

    def test_combine(self):
>     run_combine(TestPipeline(runner=self.runner()))

apache_beam/transforms/combinefn_lifecycle_test.py:88: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
apache_beam/transforms/combinefn_lifecycle_pipeline.py:108: in run_combine
    with pipeline as p:
apache_beam/pipeline.py:600: in __exit__
    self.result = self.run()
apache_beam/testing/test_pipeline.py:116: in run
    state = result.wait_until_finish()
apache_beam/runners/direct/direct_runner.py:588: in wait_until_finish
    self._executor.await_completion()
apache_beam/runners/direct/executor.py:432: in await_completion
    self._executor.await_completion()
apache_beam/runners/direct/executor.py:480: in await_completion
    raise update.exception
apache_beam/runners/direct/executor.py:370: in call
    self.attempt_call(
apache_beam/runners/direct/executor.py:404: in attempt_call
    evaluator.start_bundle()
apache_beam/runners/direct/transform_evaluator.py:869: in start_bundle
    self.runner.setup()
apache_beam/runners/common.py:[147](https://github.com/apache/beam/actions/runs/4613151287/jobs/8154848595?pr=26088#step:6:148)2: in setup
    self._invoke_lifecycle_method(self.do_fn_invoker.invoke_setup)
apache_beam/runners/common.py:1468: in _invoke_lifecycle_method
    self._reraise_augmented(exn)
apache_beam/runners/common.py:[150](https://github.com/apache/beam/actions/runs/4613151287/jobs/8154848595?pr=26088#step:6:151)8: in _reraise_augmented
    raise new_exn.with_traceback(tb)
apache_beam/runners/common.py:1466: in _invoke_lifecycle_method
    lifecycle_method()
apache_beam/runners/common.py:552: in invoke_setup
    self.signature.setup_lifecycle_method.method_value()
apache_beam/runners/direct/helper_transforms.py:101: in setup
    self._combine_fn.setup()
apache_beam/typehints/typecheck.py:205: in setup
    self._combinefn.setup(*args, **kwargs)
apache_beam/transforms/combiners.py:843: in setup
    self.fn.setup(*self.args, **self.kwargs)
apache_beam/transforms/combiners.py:665: in setup
    c.setup(*args, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.transforms.combinefn_lifecycle_pipeline.CallSequenceEnforcingCombineFn object at 0x7f3918796750>
args = (None,), kwargs = {}

    def setup(self, *args, **kwargs):
>     assert not self._setup_called, 'setup should not be called twice'
E     AssertionError: setup should not be called twice [while running 'Do/CombinePerKey/CombinePerKey(PreCombineFn)/ParDo(FinishCombine)']

@tvalentyn
Copy link
Contributor Author

tvalentyn commented Apr 13, 2023

This test runs on direct runner (bundle based and Portable/FnAPI direct runners) , but the failure can also be reproduced with a TestDataflowRunner on the counterpart ValidatesRunner test (via smth like: pytest -o log_cli=True -o log_level=Info apache_beam/transforms/combinefn_lifecycle_test.py::CombineFnLifecycleTest::test_combine --test-pipeline-options='--runner=TestDataflowRunner --project=google.com:clouddfe --temp_location=gs://clouddfe-valentyn/tmp --sdk_location=dist/apache-beam-2.47.0.dev0.tar.gz --region=us-central1 --experiments=use_runner_v2')

@tvalentyn
Copy link
Contributor Author

The failure is not happening if I manually modify the graph rewriting portions responsible for combiner lifting and disable combiner lifting, see:

@AnandInguva also mentioned the error is not reproducible if with_fanout is disabled. with_fanout involves copying here:

clone = copy.copy(self)

@AnandInguva
Copy link
Contributor

@tvalentyn I can pick up the investigation from here if you are not working on it.

@AnandInguva
Copy link
Contributor

AnandInguva commented Jan 8, 2024

INFO:root:<CombineOperation Do/CombinePerKey/CombinePerKey(PreCombineFn)/ExtractOutputs phase=extract>

INFO:root:<CombineOperation Do/CombinePerKey/CombinePerKey(PreCombineFn)/Merge phase=merge>

The setup calls for these two at bundle_processor share the same CombineFn objects resulting in sharing state.

I will look into translations and see how these are getting pickled when with_fanout is used.

@robertwb
Copy link
Contributor

Any update on this?

@liferoad
Copy link
Collaborator

@claudevdm will take a look at this.

@claudevdm
Copy link
Contributor

This issue should be fixed by #32598

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants