-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix QueryContext.getAdditionalFailureInfo bug #13120
Fix QueryContext.getAdditionalFailureInfo bug #13120
Conversation
@@ -68,6 +68,8 @@ | |||
private final long maxSpill; | |||
private final SpillSpaceTracker spillSpaceTracker; | |||
private final Map<TaskId, TaskContext> taskContexts = new ConcurrentHashMap(); | |||
// Corresponding to AbstractAggregatedMemoryContext.FORCE_FREE_TAG |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please directly use AbstractAggregatedMemoryContext.FORCE_FREE_TAG
. It's more error proof that way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please directly use
AbstractAggregatedMemoryContext.FORCE_FREE_TAG
. It's more error proof that way.
They're in different project, so if we want to directly use, we need to add maven dependency. Should we add maven dependency or just use this this way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AbstractAggregatedMemoryContext.FORCE_FREE_TAG is in presto-memory-context
@@ -352,6 +354,7 @@ private String getAdditionalFailureInfo(long allocated, long delta) | |||
} | |||
|
|||
String topConsumers = queryAllocations.entrySet().stream() | |||
.filter(entry -> !entry.getKey().equals(FORCE_FREE_TAG)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another way to do this (if it's not proper to import AbstractAggregatedMemoryContext.FORCE_FREE_TAG
directly, is to make sure queryAllocations
returned by memoryPool.getTaggedMemoryAllocations().get(queryId)
do not include this entry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just chat with @nezihyigitbasi and we agreed that we should just filter this entry out in MemoryPool.getTaggedMemoryAllocations
.
52a9ac7
to
f723ecc
Compare
@@ -351,6 +353,16 @@ private synchronized void updateTaggedMemoryAllocations(QueryId queryId, String | |||
@VisibleForTesting | |||
synchronized Map<QueryId, Map<String, Long>> getTaggedMemoryAllocations() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is getting quite expensive. Let's just change this to getTaggedMemoryAllocations(queryId)
instead. If tests are not easy to fix, introduce a separate function instead so this one (with the old implementation) is only used in tests.
370f3c2
to
6956e1f
Compare
Map<String, Long> memoryMap = taggedMemoryAllocations.get(targetQueryId) | ||
.entrySet().stream() | ||
.filter(entry -> !entry.getKey().equals(FORCE_FREE_TAG)) | ||
.collect(Collectors.toMap(map -> map.getKey(), map -> map.getValue())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue))
.entrySet().stream() | ||
.filter(entry -> !entry.getKey().equals(FORCE_FREE_TAG)) | ||
.collect(Collectors.toMap(map -> map.getKey(), map -> map.getValue())); | ||
return ImmutableMap.copyOf(memoryMap); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to make another copy.
if (taggedMemoryAllocations.get(targetQueryId) == null) { | ||
return null; | ||
} | ||
Map<String, Long> memoryMap = taggedMemoryAllocations.get(targetQueryId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return taggedMemoryAllocations.get(...)
e212dac
to
954e7f1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry that I missed this in last review. One final thing. Thanks!
} | ||
|
||
@VisibleForTesting | ||
synchronized Map<String, Long> getTaggedMemoryAllocationsByID(QueryId targetQueryId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, missed this. You can just name this function getTaggedMemoryAllocations
. The signature is obviously suggesting it's "by ID" so it's not necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getTaggedMemoryAllocations
but we have another getTaggedMemoryAllocations which is called in the test, so I decided to preserve that function and created this new function.
how about changing to getTaggedMemoryAllocation (without the s) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fine. You can just overload the function name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fine. You can just overload the function name.
I see your point, Done and Done.
166f532
to
466a6d9
Compare
@@ -351,6 +352,19 @@ private synchronized void updateTaggedMemoryAllocations(QueryId queryId, String | |||
@VisibleForTesting | |||
synchronized Map<QueryId, Map<String, Long>> getTaggedMemoryAllocations() | |||
{ | |||
return ImmutableMap.copyOf(taggedMemoryAllocations); | |||
return taggedMemoryAllocations.keySet().stream() | |||
.collect(toImmutableMap(queryId -> queryId, queryId -> getTaggedMemoryAllocations(queryId))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return taggedMemoryAllocations.keySet().stream()
.collect(toImmutableMap(identity(), this::getTaggedMemoryAllocations));
466a6d9
to
c560900
Compare
@@ -351,6 +353,19 @@ private synchronized void updateTaggedMemoryAllocations(QueryId queryId, String | |||
@VisibleForTesting | |||
synchronized Map<QueryId, Map<String, Long>> getTaggedMemoryAllocations() | |||
{ | |||
return ImmutableMap.copyOf(taggedMemoryAllocations); | |||
return taggedMemoryAllocations.keySet().stream() | |||
.collect(toImmutableMap(identity(), queryId -> getTaggedMemoryAllocations(queryId))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this:: getTaggedMemoryAllocations
For queryAllocations(a map) that tracks all the Operator and its consuming memory for per query, it consists of a special key FORCE_FREE_OPERATION with a negative number as its value for memory management purpose. When reading queryAllocations, we should skip key FORCE_FREE_OPERATION.
c560900
to
c8eb80b
Compare
For queryAllocations(a map) that tracks all the Operator and
its consuming memory for per query, it consists of a special key
FORCE_FREE_OPERATION with a negative number as its value
for memory management purpose. When reading queryAllocations,
we should skip key FORCE_FREE_OPERATION.