Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-2412: Reduce excessive synchronization in MemoryManager #1240

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

ravwojdyla
Copy link

The surface of the synchronized methods is unnecessarily large which in case of large number of concurrent writers may be suboptimal, threads may starve each other for access to the writerList (if writers are being dynamically closed and created). Please see the profile reports in the Jira issue. The fix in this PR is to use ConcurrentHashMap for MemoryManager's writerList instead of sync methods.

Jira

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason: there are already tests for MemoryManager.

Commits

Style

  • My contribution adheres to the code style guidelines and Spotless passes.

Documentation

  • In case of new functionality, my PR adds documentation that describes how to use it.

The surface of the synchronized methods is unnecessarily large which
in case of large number of concurrent writers may be suboptimal,
threads may starve each other for access to the writerList. Use
ConcurrentHashMap instead of synchronized methods.
@ravwojdyla
Copy link
Author

ravwojdyla commented Dec 11, 2023

To validate this change, I have tried to write partitioned dataset (32 concurrent processes, 200 files per 32 partitions) with and without this patch. Here's lock stats without the patch:

image

And here's stats with the patch:

image

You can see that MemoryManager which previously was the main offender is now gone after the patch.

@@ -74,7 +75,7 @@ private void checkRatio(float ratio) {
* @param writer the new created writer
* @param allocation the requested buffer size
*/
synchronized void addWriter(InternalParquetRecordWriter<?> writer, Long allocation) {
void addWriter(InternalParquetRecordWriter<?> writer, Long allocation) {
Copy link
Contributor

@ConeyLiu ConeyLiu Dec 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the same. The synchronized locks the whole method. While the ConcurrentHashMap could only guarantee the safety get/put here.

Copy link
Author

@ravwojdyla ravwojdyla Dec 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ConeyLiu thank you for a prompt review. Excited to get this fixed. Agree it's not the same, and further synchronized is not locking a whole method BUT a whole instance which in the case of MemoryManager is the single/global instance (src), so synchronized can be very expensive here. The cost is manifested by the lock stats above. I can see the tests are all green. Is such an "excessive" synchronization pattern on the global instance necessary for the semantic of MemoryManager? Which use-cases require this level of locking, and could that be relaxed? What do you recommend as next steps? Thanks in advance!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👋 @ConeyLiu I hope you're doing well. I wanted to gently follow up on this thread. Your input would be really valuable for moving forward. Thank you.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @ConeyLiu that there is a behavior change. updateAllocation() is not thread-safe and may have race condition after this change. I suspect the unit test does not cover the case where multiple writers are added and deleted concurrently.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the input @wgtmac! It's unfortunate that an expected behavior is not tested, maybe this PR will be a good place alleviate that. But first I would like to ask if we all agree there's an issue here as described in here, and as I believe evident from the lock statistics in the the comment above? And if we agree, do you have any recommendations how to move forward?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@parthchandra thanks 🙇I unfortunately don't have those specific files anymore, it's been a couple of months actually (June 2023), apologies. If this PR continues, I may need to run another profiling to validation the patch, I would be happy to share the before and after jfr files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NP. Can I suggest a unit test that adds/removes writers concurrently? Almost sure that the test is likely to fail because of updateAllocation. The question remains whether such concurrent access does, in fact, occur in engines like Spark or Hadoop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this have a big impact on the end-to-end performance? I think it should only happen when you are writing a file with many small partitions.

Copy link
Author

@ravwojdyla ravwojdyla Dec 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ConeyLiu yes it does. One of our production Spark jobs took close to 2 hours with partitioned parquet output and about 45 minutes without. Also please see the stats in JIRA issues:

I think it should only happen when you are writing a file with many small partitions.

In that particular test the partitions were probably relatively small.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ravwojdyla for your sharing. I think the changes should keep the safety. It needs more UTs to verify this.

@@ -74,7 +75,7 @@ private void checkRatio(float ratio) {
* @param writer the new created writer
* @param allocation the requested buffer size
*/
synchronized void addWriter(InternalParquetRecordWriter<?> writer, Long allocation) {
void addWriter(InternalParquetRecordWriter<?> writer, Long allocation) {
Long oldValue = writerList.get(writer);
if (oldValue == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here could update concurrently. The get and put are two methods, it is not an atomic update here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not computeIfAbsent

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

putIfAbsent() and only update the allocation if it was put in the map? It's probably that allocation code which is holding on to the lock for so long. But: that updateAllocation() operation is possibly the slow part -and it now has to be thread safe if the callers are no longer locked

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants