diff --git a/plugin/trino-resource-group-managers/pom.xml b/plugin/trino-resource-group-managers/pom.xml
index 1423eff622f1..c3eec6d2d34a 100644
--- a/plugin/trino-resource-group-managers/pom.xml
+++ b/plugin/trino-resource-group-managers/pom.xml
@@ -210,6 +210,12 @@
test
+
+ io.trino
+ trino-testing-services
+ test
+
+
org.assertj
assertj-core
diff --git a/plugin/trino-resource-group-managers/src/main/java/io/trino/plugin/resourcegroups/db/DbResourceGroupConfigurationManager.java b/plugin/trino-resource-group-managers/src/main/java/io/trino/plugin/resourcegroups/db/DbResourceGroupConfigurationManager.java
index 75bf00107991..92eb0d9469c6 100644
--- a/plugin/trino-resource-group-managers/src/main/java/io/trino/plugin/resourcegroups/db/DbResourceGroupConfigurationManager.java
+++ b/plugin/trino-resource-group-managers/src/main/java/io/trino/plugin/resourcegroups/db/DbResourceGroupConfigurationManager.java
@@ -15,9 +15,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import io.airlift.bootstrap.LifeCycleManager;
import io.airlift.log.Logger;
@@ -57,6 +56,8 @@
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.ImmutableMap.toImmutableMap;
+import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.units.Duration.succinctNanos;
import static io.trino.spi.StandardErrorCode.CONFIGURATION_INVALID;
@@ -71,8 +72,7 @@ public class DbResourceGroupConfigurationManager
private final Optional lifeCycleManager;
private final ResourceGroupsDao dao;
- @GuardedBy("this")
- private Map resourceGroupSpecs = new HashMap<>();
+ private final Map specsUsedToConfigureGroups = new ConcurrentHashMap<>();
private final ResourceGroupToTemplateMap configuredGroups = new ResourceGroupToTemplateMap();
private final AtomicReference> rootGroups = new AtomicReference<>(ImmutableList.of());
private final AtomicReference> selectors = new AtomicReference<>();
@@ -155,6 +155,13 @@ protected List getRootGroups()
return rootGroups.get();
}
+ @Override
+ protected void configureGroup(ResourceGroup group, ResourceGroupSpec groupSpec)
+ {
+ super.configureGroup(group, groupSpec);
+ specsUsedToConfigureGroups.put(group.getId(), groupSpec);
+ }
+
@PreDestroy
public void destroy()
{
@@ -221,17 +228,11 @@ public synchronized void load()
try {
Map.Entry> specsFromDb = buildSpecsFromDb();
ManagerSpec managerSpec = specsFromDb.getKey();
- Map resourceGroupSpecs = specsFromDb.getValue();
- Set changedSpecs = new HashSet<>();
- Set deletedSpecs = Sets.difference(this.resourceGroupSpecs.keySet(), resourceGroupSpecs.keySet());
+ Map newResourceGroupSpecs = specsFromDb.getValue();
+ Map> templateToGroup = configuredGroups.getAllTemplateToGroupsMappings();
+ Map changedGroups = findChangedGroups(templateToGroup, newResourceGroupSpecs);
+ Set deletedGroups = findDeletedGroups(templateToGroup, newResourceGroupSpecs);
- for (Map.Entry entry : resourceGroupSpecs.entrySet()) {
- if (!entry.getValue().sameConfig(this.resourceGroupSpecs.get(entry.getKey()))) {
- changedSpecs.add(entry.getKey());
- }
- }
-
- this.resourceGroupSpecs = resourceGroupSpecs;
this.cpuQuotaPeriod.set(managerSpec.getCpuQuotaPeriod());
this.rootGroups.set(managerSpec.getRootGroups());
List selectors = buildSelectors(managerSpec);
@@ -245,19 +246,19 @@ public synchronized void load()
this.selectors.set(selectors);
}
- configureChangedGroups(changedSpecs);
- disableDeletedGroups(deletedSpecs);
+ configureChangedGroups(changedGroups);
+ disableDeletedGroups(deletedGroups);
if (lastRefresh.get() > 0) {
- for (ResourceGroupIdTemplate deleted : deletedSpecs) {
- log.info("Resource group spec deleted %s", deleted);
+ for (ResourceGroup deleted : deletedGroups) {
+ log.info("Resource group deleted '%s'", deleted.getId());
}
- for (ResourceGroupIdTemplate changed : changedSpecs) {
- log.info("Resource group spec %s changed to %s", changed, resourceGroupSpecs.get(changed));
+ for (Map.Entry entry : changedGroups.entrySet()) {
+ log.info("Resource group '%s' changed to %s", entry.getKey().getId(), entry.getValue());
}
}
else {
- log.info("Loaded %s selectors and %s resource groups from database", this.selectors.get().size(), this.resourceGroupSpecs.size());
+ log.info("Loaded %s selectors and %s resource groups from database", this.selectors.get().size(), this.specsUsedToConfigureGroups.size());
}
lastRefresh.set(System.nanoTime());
@@ -271,6 +272,39 @@ public synchronized void load()
}
}
+ private Map findChangedGroups(
+ Map> templateToGroups,
+ Map newResourceGroupSpecs)
+ {
+ ImmutableMap.Builder changedGroups = ImmutableMap.builder();
+ for (Map.Entry> entry : templateToGroups.entrySet()) {
+ ResourceGroupSpec newSpec = newResourceGroupSpecs.get(entry.getKey());
+ if (newSpec != null) {
+ Set changedGroupsForCurrentTemplate = entry.getValue().stream()
+ .filter(resourceGroupId -> {
+ ResourceGroupSpec previousSpec = specsUsedToConfigureGroups.get(resourceGroupId.getId());
+ return previousSpec == null || !previousSpec.sameConfig(newSpec);
+ })
+ .collect(toImmutableSet());
+ for (ResourceGroup group : changedGroupsForCurrentTemplate) {
+ changedGroups.put(group, newSpec);
+ }
+ }
+ }
+ return changedGroups.buildOrThrow();
+ }
+
+ private Set findDeletedGroups(
+ Map> templateToGroups,
+ Map newResourceGroupSpecs)
+ {
+ return templateToGroups.entrySet().stream()
+ .filter(entry -> !newResourceGroupSpecs.containsKey(entry.getKey()))
+ .flatMap(entry -> entry.getValue().stream())
+ .filter(resourceGroup -> !resourceGroup.isDisabled())
+ .collect(toImmutableSet());
+ }
+
// Populate temporary data structures to build resource group specs and selectors from db
private synchronized void populateFromDbHelper(Map recordMap,
Set rootGroupIds,
@@ -356,23 +390,21 @@ private synchronized Map.Entry(managerSpec, resourceGroupSpecs);
}
- private synchronized void configureChangedGroups(Set changedSpecs)
+ private synchronized void configureChangedGroups(Map changedGroups)
{
- for (ResourceGroupIdTemplate resourceGroupIdTemplate : changedSpecs) {
- for (ResourceGroup group : configuredGroups.get(resourceGroupIdTemplate)) {
- synchronized (getRootGroup(group.getId())) {
- configureGroup(group, resourceGroupSpecs.get(resourceGroupIdTemplate));
- }
+ for (Map.Entry entry : changedGroups.entrySet()) {
+ ResourceGroup group = entry.getKey();
+ ResourceGroupSpec groupSpec = entry.getValue();
+ synchronized (getRootGroup(group.getId())) {
+ configureGroup(group, groupSpec);
}
}
}
- private synchronized void disableDeletedGroups(Set deletedSpecs)
+ private synchronized void disableDeletedGroups(Set deletedGroups)
{
- for (ResourceGroupIdTemplate resourceGroupIdTemplate : deletedSpecs) {
- for (ResourceGroup group : configuredGroups.get(resourceGroupIdTemplate)) {
- group.setDisabled(true);
- }
+ for (ResourceGroup group : deletedGroups) {
+ group.setDisabled(true);
}
}
@@ -409,7 +441,7 @@ public void shutdown()
*/
private static class ResourceGroupToTemplateMap
{
- private final Map groups = new HashMap<>();
+ private final Map groups = new ConcurrentHashMap<>();
private final Map groupIdToTemplate = new HashMap<>();
private final Map> templateToGroups = new HashMap<>();
@@ -427,22 +459,19 @@ synchronized void put(ResourceGroupIdTemplate newTemplate, ResourceGroup group)
templateToGroups.get(previousTemplate)
.remove(group);
}
- templateToGroups.computeIfAbsent(newTemplate, _ -> ConcurrentHashMap.newKeySet())
+ templateToGroups.computeIfAbsent(newTemplate, _ -> new HashSet<>())
.add(group);
}
- synchronized List get(ResourceGroupIdTemplate idTemplate)
+ ResourceGroup get(ResourceGroupId groupId)
{
- Set groups = templateToGroups.get(idTemplate);
- if (groups == null) {
- return ImmutableList.of();
- }
- return ImmutableList.copyOf(groups);
+ return groups.get(groupId);
}
- synchronized ResourceGroup get(ResourceGroupId groupId)
+ synchronized Map> getAllTemplateToGroupsMappings()
{
- return groups.get(groupId);
+ return templateToGroups.entrySet().stream()
+ .collect(toImmutableMap(Map.Entry::getKey, entry -> ImmutableSet.copyOf(entry.getValue())));
}
}
}
diff --git a/plugin/trino-resource-group-managers/src/test/java/io/trino/plugin/resourcegroups/db/TestDbResourceGroupConfigurationManager.java b/plugin/trino-resource-group-managers/src/test/java/io/trino/plugin/resourcegroups/db/TestDbResourceGroupConfigurationManager.java
index 201845e213b5..e79940dcc164 100644
--- a/plugin/trino-resource-group-managers/src/test/java/io/trino/plugin/resourcegroups/db/TestDbResourceGroupConfigurationManager.java
+++ b/plugin/trino-resource-group-managers/src/test/java/io/trino/plugin/resourcegroups/db/TestDbResourceGroupConfigurationManager.java
@@ -28,6 +28,7 @@
import io.trino.spi.session.ResourceEstimates;
import org.h2.jdbc.JdbcException;
import org.jdbi.v3.core.statement.UnableToExecuteStatementException;
+import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -36,6 +37,8 @@
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Pattern;
@@ -43,6 +46,7 @@
import static io.trino.execution.resourcegroups.InternalResourceGroup.DEFAULT_WEIGHT;
import static io.trino.spi.resourcegroups.SchedulingPolicy.FAIR;
import static io.trino.spi.resourcegroups.SchedulingPolicy.WEIGHTED;
+import static io.trino.testing.assertions.Assert.assertEventually;
import static io.trino.testing.assertions.TrinoExceptionAssert.assertTrinoExceptionThrownBy;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.assertj.core.api.Assertions.assertThat;
@@ -379,6 +383,61 @@ public void testMatchByUsersAndGroups()
.isEqualTo(Optional.of(new ResourceGroupIdTemplate("group")));
}
+ @RepeatedTest(10)
+ public void testConfigurationUpdateIsNotLost()
+ {
+ // This test attempts to reproduce the following sequence:
+ // 1. Load resource group configuration C1, which includes template T1.
+ // 2. For query Q1, select template T1 and expand it into resource group R1.
+ // 3. For query Q1, obtain C1 in DbResourceGroupConfigurationManager#configure.
+ // 4. Load resource group configuration C2 with modified parameters for T1.
+ //
+ // If everything works correctly, C2 should eventually be applied to R1.
+ // We want to avoid the following scenarios:
+ // - C1, obtained in step 3, overwrites C2 applied to R1 in step 4, and no subsequent
+ // 'load' applies C2 again.
+ // - The 'load' in step 4 doesn't apply C2 to R1 because 'configure' hasn't created a
+ // mapping between T1 and R1 yet, and no subsequent 'load' detects the configuration change for T1.
+
+ H2DaoProvider daoProvider = setup("test_lost_update");
+ H2ResourceGroupsDao dao = daoProvider.get();
+ dao.createResourceGroupsGlobalPropertiesTable();
+ dao.createResourceGroupsTable();
+ dao.createSelectorsTable();
+ dao.insertResourceGroup(1, "global", "80%", 10, null, 1, null, null, null, null, null, null, ENVIRONMENT);
+ dao.insertSelector(1, 1, null, "userGroup", null, null, null, null);
+ DbResourceGroupConfigurationManager manager = new DbResourceGroupConfigurationManager(_ -> {}, new DbResourceGroupConfig(), daoProvider.get(), ENVIRONMENT);
+
+ Optional> userGroup = manager.match(userGroupsSelectionCriteria("userGroup"));
+ assertThat(userGroup.isPresent()).isTrue();
+ SelectionContext selectionContext = userGroup.get();
+
+ InternalResourceGroup resourceGroup = new InternalResourceGroup("global", (_, _) -> {}, directExecutor());
+
+ try (ExecutorService executor = Executors.newFixedThreadPool(2)) {
+ dao.updateResourceGroup(1, "global", "80%", 10, null, 10, null, null, null, null, null, null, ENVIRONMENT);
+
+ executor.submit(() -> {
+ synchronized (resourceGroup) {
+ // Wait while holding the lock to increase the likelihood that 'load' and 'configure'
+ // will attempt to update the configuration simultaneously. Both need to acquire this
+ // lock to update the resource group.
+ Thread.sleep(10);
+ }
+ return null;
+ });
+
+ executor.submit(manager::load);
+
+ manager.configure(resourceGroup, selectionContext);
+
+ assertEventually(() -> {
+ manager.load();
+ assertThat(resourceGroup.getHardConcurrencyLimit()).isEqualTo(10);
+ });
+ }
+ }
+
private static void assertEqualsResourceGroup(
InternalResourceGroup group,
String softMemoryLimit,