Skip to content

Commit

Permalink
Do not enforce memory limits for fault tolerant queries
Browse files Browse the repository at this point in the history
LowMemoryKiller is responsible for freeing up the memory if necessary
  • Loading branch information
arhimondr committed Apr 6, 2022
1 parent 664a2bf commit 9488328
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.trino.memory.LocalMemoryManager;
import io.trino.memory.NodeMemoryConfig;
import io.trino.memory.QueryContext;
import io.trino.operator.RetryPolicy;
import io.trino.spi.QueryId;
import io.trino.spi.TrinoException;
import io.trino.spi.VersionEmbedder;
Expand Down Expand Up @@ -71,9 +72,11 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.concurrent.Threads.threadsNamed;
import static io.trino.SystemSessionProperties.getQueryMaxMemoryPerNode;
import static io.trino.SystemSessionProperties.getRetryPolicy;
import static io.trino.SystemSessionProperties.resourceOvercommit;
import static io.trino.collect.cache.SafeCaches.buildNonEvictableCache;
import static io.trino.execution.SqlTask.createSqlTask;
import static io.trino.operator.RetryPolicy.TASK;
import static io.trino.spi.StandardErrorCode.ABANDONED_TASK;
import static io.trino.spi.StandardErrorCode.SERVER_SHUTTING_DOWN;
import static java.lang.Math.min;
Expand Down Expand Up @@ -394,12 +397,20 @@ private TaskInfo doUpdateTask(
SqlTask sqlTask = tasks.getUnchecked(taskId);
QueryContext queryContext = sqlTask.getQueryContext();
if (!queryContext.isMemoryLimitsInitialized()) {
long sessionQueryMaxMemoryPerNode = getQueryMaxMemoryPerNode(session).toBytes();
RetryPolicy retryPolicy = getRetryPolicy(session);
if (retryPolicy == TASK) {
// Memory limit for fault tolerant queries should only be enforced by the MemoryPool.
// LowMemoryKiller is responsible for freeing up the MemoryPool if necessary.
queryContext.initializeMemoryLimits(false, /* unlimited */ Long.MAX_VALUE);
}
else {
long sessionQueryMaxMemoryPerNode = getQueryMaxMemoryPerNode(session).toBytes();

// Session properties are only allowed to decrease memory limits, not increase them
queryContext.initializeMemoryLimits(
resourceOvercommit(session),
min(sessionQueryMaxMemoryPerNode, queryMaxMemoryPerNode));
// Session properties are only allowed to decrease memory limits, not increase them
queryContext.initializeMemoryLimits(
resourceOvercommit(session),
min(sessionQueryMaxMemoryPerNode, queryMaxMemoryPerNode));
}
}

sqlTask.recordHeartbeat();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.trino.memory.LowMemoryKiller.QueryMemoryInfo;
import io.trino.metadata.InternalNode;
import io.trino.metadata.InternalNodeManager;
import io.trino.operator.RetryPolicy;
import io.trino.server.BasicQueryInfo;
import io.trino.server.ServerConfig;
import io.trino.spi.QueryId;
Expand Down Expand Up @@ -76,6 +77,7 @@
import static io.trino.SystemSessionProperties.RESOURCE_OVERCOMMIT;
import static io.trino.SystemSessionProperties.getQueryMaxMemory;
import static io.trino.SystemSessionProperties.getQueryMaxTotalMemory;
import static io.trino.SystemSessionProperties.getRetryPolicy;
import static io.trino.SystemSessionProperties.resourceOvercommit;
import static io.trino.metadata.NodeState.ACTIVE;
import static io.trino.metadata.NodeState.SHUTTING_DOWN;
Expand Down Expand Up @@ -191,6 +193,14 @@ public synchronized void process(Iterable<QueryExecution> runningQueries, Suppli
boolean resourceOvercommit = resourceOvercommit(query.getSession());
long userMemoryReservation = query.getUserMemoryReservation().toBytes();
long totalMemoryReservation = query.getTotalMemoryReservation().toBytes();
totalUserMemoryBytes += userMemoryReservation;
totalMemoryBytes += totalMemoryReservation;

if (getRetryPolicy(query.getSession()) == RetryPolicy.TASK) {
// Memory limit for fault tolerant queries should only be enforced by the MemoryPool.
// LowMemoryKiller is responsible for freeing up the MemoryPool if necessary.
continue;
}

if (resourceOvercommit && outOfMemory) {
// If a query has requested resource overcommit, only kill it if the cluster has run out of memory
Expand All @@ -213,9 +223,6 @@ public synchronized void process(Iterable<QueryExecution> runningQueries, Suppli
queryKilled = true;
}
}

totalUserMemoryBytes += userMemoryReservation;
totalMemoryBytes += totalMemoryReservation;
}

clusterUserMemoryReservation.set(totalUserMemoryBytes);
Expand Down
12 changes: 12 additions & 0 deletions docs/src/main/sphinx/admin/properties-resource-management.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ by the hash tables built during execution, memory used during sorting, etc.
When the user memory allocation of a query on any worker hits this limit,
it is killed.

.. note::

Does not apply for queries with task level retries enabled (``retry-policy=TASK``)

``query.max-memory``
^^^^^^^^^^^^^^^^^^^^

Expand All @@ -37,6 +41,10 @@ by the hash tables built during execution, memory used during sorting, etc.
When the user memory allocation of a query across all workers hits this limit
it is killed.

.. note::

Does not apply for queries with task level retries enabled (``retry-policy=TASK``)

``query.max-total-memory``
^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand All @@ -48,6 +56,10 @@ including revocable memory. When the memory allocated by a query across all
workers hits this limit it is killed. The value of ``query.max-total-memory``
must be greater than ``query.max-memory``.

.. note::

Does not apply for queries with task level retries enabled (``retry-policy=TASK``)

``memory.heap-headroom-per-node``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down

0 comments on commit 9488328

Please sign in to comment.