Skip to content

Commit

Permalink
Fix QueryContext.getAdditionalFailureInfo bug
Browse files Browse the repository at this point in the history
For queryAllocations(a map) that tracks all the Operator and
its consuming memory for per query, it consists of a special key
FORCE_FREE_OPERATION with a negative number as its value
for memory management purpose. When reading queryAllocations,
we should skip key FORCE_FREE_OPERATION.
  • Loading branch information
kewang1024 authored and Rongrong Zhong committed Aug 22, 2019
1 parent 5a4c520 commit 1fc8e5b
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.facebook.presto.spi.memory.MemoryPoolId;
import com.facebook.presto.spi.memory.MemoryPoolInfo;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.DataSize;
Expand All @@ -34,11 +33,14 @@
import java.util.Map.Entry;
import java.util.concurrent.CopyOnWriteArrayList;

import static com.facebook.presto.memory.context.AbstractAggregatedMemoryContext.FORCE_FREE_TAG;
import static com.facebook.presto.operator.Operator.NOT_BLOCKED;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;

public class MemoryPool
{
Expand Down Expand Up @@ -351,6 +353,19 @@ private synchronized void updateTaggedMemoryAllocations(QueryId queryId, String
@VisibleForTesting
synchronized Map<QueryId, Map<String, Long>> getTaggedMemoryAllocations()
{
return ImmutableMap.copyOf(taggedMemoryAllocations);
return taggedMemoryAllocations.keySet().stream()
.collect(toImmutableMap(identity(), this::getTaggedMemoryAllocations));
}

@VisibleForTesting
synchronized Map<String, Long> getTaggedMemoryAllocations(QueryId targetQueryId)
{
if (taggedMemoryAllocations.get(targetQueryId) == null) {
return null;
}
return taggedMemoryAllocations.get(targetQueryId)
.entrySet().stream()
.filter(entry -> !entry.getKey().equals(FORCE_FREE_TAG))
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ private void enforceTotalMemoryLimit(long allocated, long delta, long maxMemory)
@GuardedBy("this")
private String getAdditionalFailureInfo(long allocated, long delta)
{
Map<String, Long> queryAllocations = memoryPool.getTaggedMemoryAllocations().get(queryId);
Map<String, Long> queryAllocations = memoryPool.getTaggedMemoryAllocations(queryId);

String additionalInfo = format("Allocated: %s, Delta: %s", succinctBytes(allocated), succinctBytes(delta));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,18 +236,21 @@ public void testTaggedAllocations()

testPool.reserve(testQuery, "test_tag", 10);

Map<String, Long> allocations = testPool.getTaggedMemoryAllocations().get(testQuery);
Map<String, Long> allocations = testPool.getTaggedMemoryAllocations(testQuery);
assertEquals(allocations, ImmutableMap.of("test_tag", 10L));

// free 5 bytes for test_tag
testPool.free(testQuery, "test_tag", 5);
allocations = testPool.getTaggedMemoryAllocations(testQuery);
assertEquals(allocations, ImmutableMap.of("test_tag", 5L));

testPool.reserve(testQuery, "test_tag2", 20);
allocations = testPool.getTaggedMemoryAllocations(testQuery);
assertEquals(allocations, ImmutableMap.of("test_tag", 5L, "test_tag2", 20L));

// free the remaining 5 bytes for test_tag
testPool.free(testQuery, "test_tag", 5);
allocations = testPool.getTaggedMemoryAllocations(testQuery);
assertEquals(allocations, ImmutableMap.of("test_tag2", 20L));

// free all for test_tag2
Expand All @@ -263,12 +266,12 @@ public void testMoveQuery()
MemoryPool pool2 = new MemoryPool(new MemoryPoolId("test"), new DataSize(1000, BYTE));
pool1.reserve(testQuery, "test_tag", 10);

Map<String, Long> allocations = pool1.getTaggedMemoryAllocations().get(testQuery);
Map<String, Long> allocations = pool1.getTaggedMemoryAllocations(testQuery);
assertEquals(allocations, ImmutableMap.of("test_tag", 10L));

pool1.moveQuery(testQuery, pool2);
assertNull(pool1.getTaggedMemoryAllocations().get(testQuery));
allocations = pool2.getTaggedMemoryAllocations().get(testQuery);
assertNull(pool1.getTaggedMemoryAllocations(testQuery));
allocations = pool2.getTaggedMemoryAllocations(testQuery);
assertEquals(allocations, ImmutableMap.of("test_tag", 10L));

assertEquals(pool1.getFreeBytes(), 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,13 @@ public void testMoveTaggedAllocations()
LocalMemoryContext memoryContext = operatorContext.aggregateUserMemoryContext().newLocalMemoryContext("test_context");
memoryContext.setBytes(1_000);

Map<String, Long> allocations = generalPool.getTaggedMemoryAllocations().get(queryId);
Map<String, Long> allocations = generalPool.getTaggedMemoryAllocations(queryId);
assertEquals(allocations, ImmutableMap.of("test_context", 1_000L));

queryContext.setMemoryPool(reservedPool);

assertNull(generalPool.getTaggedMemoryAllocations().get(queryId));
allocations = reservedPool.getTaggedMemoryAllocations().get(queryId);
assertNull(generalPool.getTaggedMemoryAllocations(queryId));
allocations = reservedPool.getTaggedMemoryAllocations(queryId);
assertEquals(allocations, ImmutableMap.of("test_context", 1_000L));

assertEquals(generalPool.getFreeBytes(), 10_000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@
import static java.lang.String.format;

@ThreadSafe
abstract class AbstractAggregatedMemoryContext
public abstract class AbstractAggregatedMemoryContext
implements AggregatedMemoryContext
{
static final ListenableFuture<?> NOT_BLOCKED = Futures.immediateFuture(null);

// When an aggregated memory context is closed, it force-frees the memory allocated by its
// children local memory contexts. Since the memory pool API enforces a tag to be used for
// reserve/free operations, we define this special tag to use with such free operations.
protected static final String FORCE_FREE_TAG = "FORCE_FREE_OPERATION";
public static final String FORCE_FREE_TAG = "FORCE_FREE_OPERATION";

@GuardedBy("this")
private long usedBytes;
Expand Down

0 comments on commit 1fc8e5b

Please sign in to comment.