Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-2412: Reduce excessive synchronization in MemoryManager #1240

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -46,7 +47,7 @@ public class MemoryManager {

private final long totalMemoryPool;
private final long minMemoryAllocation;
private final Map<InternalParquetRecordWriter<?>, Long> writerList = new HashMap<>();
private final Map<InternalParquetRecordWriter<?>, Long> writerList = new ConcurrentHashMap<>();
private final Map<String, Runnable> callBacks = new HashMap<String, Runnable>();
private double scale = 1.0;

Expand Down Expand Up @@ -74,7 +75,7 @@ private void checkRatio(float ratio) {
* @param writer the new created writer
* @param allocation the requested buffer size
*/
synchronized void addWriter(InternalParquetRecordWriter<?> writer, Long allocation) {
void addWriter(InternalParquetRecordWriter<?> writer, Long allocation) {
Copy link
Contributor

@ConeyLiu ConeyLiu Dec 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the same. The synchronized locks the whole method. While the ConcurrentHashMap could only guarantee the safety get/put here.

Copy link
Author

@ravwojdyla ravwojdyla Dec 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ConeyLiu thank you for a prompt review. Excited to get this fixed. Agree it's not the same, and further synchronized is not locking a whole method BUT a whole instance which in the case of MemoryManager is the single/global instance (src), so synchronized can be very expensive here. The cost is manifested by the lock stats above. I can see the tests are all green. Is such an "excessive" synchronization pattern on the global instance necessary for the semantic of MemoryManager? Which use-cases require this level of locking, and could that be relaxed? What do you recommend as next steps? Thanks in advance!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👋 @ConeyLiu I hope you're doing well. I wanted to gently follow up on this thread. Your input would be really valuable for moving forward. Thank you.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @ConeyLiu that there is a behavior change. updateAllocation() is not thread-safe and may have race condition after this change. I suspect the unit test does not cover the case where multiple writers are added and deleted concurrently.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the input @wgtmac! It's unfortunate that an expected behavior is not tested, maybe this PR will be a good place alleviate that. But first I would like to ask if we all agree there's an issue here as described in here, and as I believe evident from the lock statistics in the the comment above? And if we agree, do you have any recommendations how to move forward?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@parthchandra thanks 🙇I unfortunately don't have those specific files anymore, it's been a couple of months actually (June 2023), apologies. If this PR continues, I may need to run another profiling to validation the patch, I would be happy to share the before and after jfr files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NP. Can I suggest a unit test that adds/removes writers concurrently? Almost sure that the test is likely to fail because of updateAllocation. The question remains whether such concurrent access does, in fact, occur in engines like Spark or Hadoop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this have a big impact on the end-to-end performance? I think it should only happen when you are writing a file with many small partitions.

Copy link
Author

@ravwojdyla ravwojdyla Dec 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ConeyLiu yes it does. One of our production Spark jobs took close to 2 hours with partitioned parquet output and about 45 minutes without. Also please see the stats in JIRA issues:

I think it should only happen when you are writing a file with many small partitions.

In that particular test the partitions were probably relatively small.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ravwojdyla for your sharing. I think the changes should keep the safety. It needs more UTs to verify this.

Long oldValue = writerList.get(writer);
if (oldValue == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here could update concurrently. The get and put are two methods, it is not an atomic update here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not computeIfAbsent

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

putIfAbsent() and only update the allocation if it was put in the map? It's probably that allocation code which is holding on to the lock for so long. But: that updateAllocation() operation is possibly the slow part -and it now has to be thread safe if the callers are no longer locked

writerList.put(writer, allocation);
Expand All @@ -92,7 +93,7 @@ synchronized void addWriter(InternalParquetRecordWriter<?> writer, Long allocati
*
* @param writer the writer that has been closed
*/
synchronized void removeWriter(InternalParquetRecordWriter<?> writer) {
void removeWriter(InternalParquetRecordWriter<?> writer) {
writerList.remove(writer);
if (!writerList.isEmpty()) {
updateAllocation();
Expand Down