Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark Dataset runner] Break linage of dataset to reduce Spark planning overhead in case of large query plans #25187

Merged
merged 1 commit into from
Feb 3, 2023

Conversation

mosche
Copy link
Member

@mosche mosche commented Jan 26, 2023

#24711 removed some unnecessary caching when processing MultiParDos with additional but unconsumed outputs. With storage level "Memory Only" caching was done via RDDs. This also had a positive side effect as it breaks linage of the dataset. This is particularly beneficial with complex query plans as generated by TCPDS query 83 (due to #24314).

With caching removed, performance dropped for query 83.
See http://metrics.beam.apache.org/d/tkqc0AdGk2/tpc-ds-spark-classic-new-sql?orgId=1&viewPanel=38&from=1670799600000&to=1673305199000

This change tracks a rough complexity estimate of the datasets and breaks linage where necessary by converting it to an RDD and back again.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

@mosche
Copy link
Member Author

mosche commented Jan 26, 2023

Run Spark Runner Tpcds Tests

3 similar comments
@mosche
Copy link
Member Author

mosche commented Jan 26, 2023

Run Spark Runner Tpcds Tests

@mosche
Copy link
Member Author

mosche commented Jan 27, 2023

Run Spark Runner Tpcds Tests

@mosche
Copy link
Member Author

mosche commented Jan 27, 2023

Run Spark Runner Tpcds Tests

@mosche mosche force-pushed the tpcds_83_2022-12-28 branch from ecd030c to 00cd938 Compare January 27, 2023 09:20
@mosche mosche changed the title [Spark Dataset runner] Experiment to limit execution plan complexity [Spark Dataset runner] Break linage of dataset to reduce Spark planning overhead in case of large query plans Jan 27, 2023
@mosche mosche marked this pull request as ready for review January 27, 2023 09:43
@mosche
Copy link
Member Author

mosche commented Jan 27, 2023

R: @aromanenko-dev
R: @echauchot

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

@echauchot
Copy link
Contributor

@aromanenko-dev I'll be in vacations tonight, can you take care of this review ?

@aromanenko-dev
Copy link
Contributor

@echauchot Sure, I'll take a look

@aromanenko-dev aromanenko-dev self-requested a review January 27, 2023 17:21
@mosche
Copy link
Member Author

mosche commented Feb 1, 2023

@aromanenko-dev kind ping :)

Copy link
Contributor

@aromanenko-dev aromanenko-dev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, looks fine for me, only several notes, ptal

TRANSFORM_TRANSLATORS.put(Impulse.class, new ImpulseTranslatorBatch());
TRANSFORM_TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslatorBatch<>());
TRANSFORM_TRANSLATORS.put(Combine.Globally.class, new CombineGloballyTranslatorBatch<>());
TRANSFORM_TRANSLATORS.put(Impulse.class, new ImpulseTranslatorBatch(0));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How all this and below complexity factors were estimated?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's mostly just relative factors comparing what the translator adds to the DAG, no exact science needed here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it depend on a translated pipeline?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it doesn't ... this is only about a single PTransform (and it's translation) looked at in isolation

Copy link
Contributor

@aromanenko-dev aromanenko-dev Feb 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then it could be just hardcoded as a constant value in every transform translator class and I don't see a reason to have a dedicated constructor for this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the way i implemented it initially... I personally find it easier to have an overview of all values in a single place, but happy to revert.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, done @aromanenko-dev

@mosche
Copy link
Member Author

mosche commented Feb 1, 2023

@aromanenko-dev pls have another look

Copy link
Contributor

@aromanenko-dev aromanenko-dev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@aromanenko-dev aromanenko-dev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM
Feel free to self-merge it once tests are green

@mosche
Copy link
Member Author

mosche commented Feb 1, 2023

Run Spark StructuredStreaming ValidatesRunner

1 similar comment
@mosche
Copy link
Member Author

mosche commented Feb 1, 2023

Run Spark StructuredStreaming ValidatesRunner

@mosche
Copy link
Member Author

mosche commented Feb 2, 2023

Thanks @aromanenko-dev , have to look into the vr tests, not sure what's going on :(

@mosche
Copy link
Member Author

mosche commented Feb 2, 2023

🤯 wth, below the output of the following statements, somehow rdd.persist() corrupts the data 💥

  dataset.persist(storageLevel);

  System.out.println("\nContent of persisted dataset (1st eval)");
  dataset.foreach(printValue);

  System.out.println("\nContent of persisted dataset (2nd eval)");
  dataset.foreach(printValue);

  System.out.println("\nContent of rdd (1st eval)");
  dataset.rdd().foreach(printValue);

  System.out.println("\nContent of rdd (2nd eval)");
  dataset.rdd().foreach(printValue);

  System.out.println("\nContent of persisted rdd (1st eval)");
  dataset.rdd().persist().foreach(printValue);

  System.out.println("\nContent of persisted rdd (2nd eval)");
  dataset.rdd().persist().foreach(printValue);
Content of persisted dataset (1st eval)
key=k3 {@1797166500}, value=[0] {@621459392}
key=k5 {@522613115}, value=[2147483647, -2147483648] {@50229760}
key=k1 {@935655811}, value=[3, 4] {@1059339408}
key=k2 {@1698407107}, value=[66, -33] {@1059339408}

Content of persisted dataset (2nd eval)
key=k3 {@491772294}, value=[0] {@1519705721}
key=k1 {@1801297116}, value=[3, 4] {@929237815}
key=k2 {@1278247606}, value=[66, -33] {@929237815}
key=k5 {@1642777670}, value=[2147483647, -2147483648] {@822592932}

Content of rdd (1st eval)
key=k5 {@1706950017}, value=[2147483647, -2147483648] {@935984436}
key=k1 {@830413362}, value=[3, 4] {@1431942185}
key=k3 {@1807383422}, value=[0] {@1220036903}
key=k2 {@1334173112}, value=[66, -33] {@1431942185}

Content of rdd (2nd eval)
key=k1 {@1977617218}, value=[3, 4] {@1966593054}
key=k2 {@131385719}, value=[66, -33] {@1966593054}
key=k3 {@966124643}, value=[0] {@4762028}
key=k5 {@1477114665}, value=[2147483647, -2147483648] {@942837752}

Content of persisted rdd (1st eval)
key=k3 {@2050847337}, value=[0] {@83038769}
key=k5 {@906982325}, value=[2147483647, -2147483648] {@714957170}
key=k1 {@654493203}, value=[66, -33] {@1747607045}
key=k2 {@287079803}, value=[66, -33] {@1747607045}

Content of persisted rdd (2nd eval)
key=k1 {@654493203}, value=[66, -33] {@1747607045}
key=k2 {@287079803}, value=[66, -33] {@1747607045}
key=k5 {@906982325}, value=[2147483647, -2147483648] {@714957170}
key=k3 {@2050847337}, value=[0] {@83038769}

@aromanenko-dev
Copy link
Contributor

@mosche Are you sure that it's caused by your changes?

@mosche
Copy link
Member Author

mosche commented Feb 2, 2023

@aromanenko-dev It's triggered by this optional part https://github.com/apache/beam/pull/25187/files#diff-4df56f442668d45bf7269e0bc379e95298b178c2f3072c72f30c1c0c296caed9R301-R304

I could simply remove that and keep caching as dataset. But there's also other places where caching is done on the RDD rather than the dataset if storage level is MEMORY_ONLY. It's a bit concerning to now know why / how this bug is triggered :(

@mosche
Copy link
Member Author

mosche commented Feb 3, 2023

Tracked this down to #25296, will fix the bug first and then get back to this PR.

…ng overhead in case of large complex query plans (relates to apache#24710 and apache#23845)
@mosche mosche force-pushed the tpcds_83_2022-12-28 branch from 3116d46 to 39f241a Compare February 3, 2023 09:42
@mosche
Copy link
Member Author

mosche commented Feb 3, 2023

Run Spark StructuredStreaming ValidatesRunner

@mosche
Copy link
Member Author

mosche commented Feb 3, 2023

Run Java PreCommit

@mosche
Copy link
Member Author

mosche commented Feb 3, 2023

Run Spark Runner Tpcds Tests

1 similar comment
@mosche
Copy link
Member Author

mosche commented Feb 3, 2023

Run Spark Runner Tpcds Tests

@mosche mosche merged commit 72781cb into apache:master Feb 3, 2023
@mosche mosche deleted the tpcds_83_2022-12-28 branch February 3, 2023 11:42
@Abacn
Copy link
Contributor

Abacn commented Mar 4, 2023

Hi @mosche and @aromanenko-dev Either this PR or #25297 likely caused SparkStructuredStreaming Batch Load test failing since Feb 2: https://ci-beam.apache.org/view/LoadTests/job/beam_LoadTests_Java_ParDo_SparkStructuredStreaming_Batch/1016/

error message is java.lang.StackOverflowError. It is an infinite loop of recursion.

are there actions taken?

Update: excluded #25297 because reverting that PR the StackOverflowError is still seen: https://ci-beam.apache.org/view/LoadTests/job/beam_LoadTests_Java_ParDo_SparkStructuredStreaming_Batch_PR/5/

@mosche
Copy link
Member Author

mosche commented Mar 6, 2023

Thanks @Abacn , having a look

@mosche
Copy link
Member Author

mosche commented Mar 6, 2023

fixed here #25732

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants