Skip to content

Commit

Permalink
Avoid losing config updates in DbResourceGroupConfigurationManager
Browse files Browse the repository at this point in the history
Consider the following sequence:

1. Start with configuredGroups being empty.
2. Load resource group configuration C1, which includes template T1.
3. Select template T1 and expand it into resource group R1.
4. Obtain C1 in DbResourceGroupConfigurationManager#configure.
5. Load resource group configuration C2 with modified parameters for T1.
6. Create a new mapping: configuredGroups[T1 -> R1], then configure R1
   using C1.

As a result, R1 will miss C2. Another configuration update (C3) will be
required to apply changes.
  • Loading branch information
piotrrzysko authored and hashhar committed Oct 15, 2024
1 parent b1f5665 commit 6030124
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 42 deletions.
6 changes: 6 additions & 0 deletions plugin/trino-resource-group-managers/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-testing-services</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import io.airlift.bootstrap.LifeCycleManager;
import io.airlift.log.Logger;
Expand Down Expand Up @@ -57,6 +56,8 @@
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.units.Duration.succinctNanos;
import static io.trino.spi.StandardErrorCode.CONFIGURATION_INVALID;
Expand All @@ -71,8 +72,7 @@ public class DbResourceGroupConfigurationManager

private final Optional<LifeCycleManager> lifeCycleManager;
private final ResourceGroupsDao dao;
@GuardedBy("this")
private Map<ResourceGroupIdTemplate, ResourceGroupSpec> resourceGroupSpecs = new HashMap<>();
private final Map<ResourceGroupId, ResourceGroupSpec> specsUsedToConfigureGroups = new ConcurrentHashMap<>();
private final ResourceGroupToTemplateMap configuredGroups = new ResourceGroupToTemplateMap();
private final AtomicReference<List<ResourceGroupSpec>> rootGroups = new AtomicReference<>(ImmutableList.of());
private final AtomicReference<List<ResourceGroupSelector>> selectors = new AtomicReference<>();
Expand Down Expand Up @@ -155,6 +155,13 @@ protected List<ResourceGroupSpec> getRootGroups()
return rootGroups.get();
}

@Override
protected void configureGroup(ResourceGroup group, ResourceGroupSpec groupSpec)
{
super.configureGroup(group, groupSpec);
specsUsedToConfigureGroups.put(group.getId(), groupSpec);
}

@PreDestroy
public void destroy()
{
Expand Down Expand Up @@ -221,17 +228,11 @@ public synchronized void load()
try {
Map.Entry<ManagerSpec, Map<ResourceGroupIdTemplate, ResourceGroupSpec>> specsFromDb = buildSpecsFromDb();
ManagerSpec managerSpec = specsFromDb.getKey();
Map<ResourceGroupIdTemplate, ResourceGroupSpec> resourceGroupSpecs = specsFromDb.getValue();
Set<ResourceGroupIdTemplate> changedSpecs = new HashSet<>();
Set<ResourceGroupIdTemplate> deletedSpecs = Sets.difference(this.resourceGroupSpecs.keySet(), resourceGroupSpecs.keySet());
Map<ResourceGroupIdTemplate, ResourceGroupSpec> newResourceGroupSpecs = specsFromDb.getValue();
Map<ResourceGroupIdTemplate, Set<ResourceGroup>> templateToGroup = configuredGroups.getAllTemplateToGroupsMappings();
Map<ResourceGroup, ResourceGroupSpec> changedGroups = findChangedGroups(templateToGroup, newResourceGroupSpecs);
Set<ResourceGroup> deletedGroups = findDeletedGroups(templateToGroup, newResourceGroupSpecs);

for (Map.Entry<ResourceGroupIdTemplate, ResourceGroupSpec> entry : resourceGroupSpecs.entrySet()) {
if (!entry.getValue().sameConfig(this.resourceGroupSpecs.get(entry.getKey()))) {
changedSpecs.add(entry.getKey());
}
}

this.resourceGroupSpecs = resourceGroupSpecs;
this.cpuQuotaPeriod.set(managerSpec.getCpuQuotaPeriod());
this.rootGroups.set(managerSpec.getRootGroups());
List<ResourceGroupSelector> selectors = buildSelectors(managerSpec);
Expand All @@ -245,19 +246,19 @@ public synchronized void load()
this.selectors.set(selectors);
}

configureChangedGroups(changedSpecs);
disableDeletedGroups(deletedSpecs);
configureChangedGroups(changedGroups);
disableDeletedGroups(deletedGroups);

if (lastRefresh.get() > 0) {
for (ResourceGroupIdTemplate deleted : deletedSpecs) {
log.info("Resource group spec deleted %s", deleted);
for (ResourceGroup deleted : deletedGroups) {
log.info("Resource group deleted '%s'", deleted.getId());
}
for (ResourceGroupIdTemplate changed : changedSpecs) {
log.info("Resource group spec %s changed to %s", changed, resourceGroupSpecs.get(changed));
for (Map.Entry<ResourceGroup, ResourceGroupSpec> entry : changedGroups.entrySet()) {
log.info("Resource group '%s' changed to %s", entry.getKey().getId(), entry.getValue());
}
}
else {
log.info("Loaded %s selectors and %s resource groups from database", this.selectors.get().size(), this.resourceGroupSpecs.size());
log.info("Loaded %s selectors and %s resource groups from database", this.selectors.get().size(), this.specsUsedToConfigureGroups.size());
}

lastRefresh.set(System.nanoTime());
Expand All @@ -271,6 +272,39 @@ public synchronized void load()
}
}

private Map<ResourceGroup, ResourceGroupSpec> findChangedGroups(
Map<ResourceGroupIdTemplate, Set<ResourceGroup>> templateToGroups,
Map<ResourceGroupIdTemplate, ResourceGroupSpec> newResourceGroupSpecs)
{
ImmutableMap.Builder<ResourceGroup, ResourceGroupSpec> changedGroups = ImmutableMap.builder();
for (Map.Entry<ResourceGroupIdTemplate, Set<ResourceGroup>> entry : templateToGroups.entrySet()) {
ResourceGroupSpec newSpec = newResourceGroupSpecs.get(entry.getKey());
if (newSpec != null) {
Set<ResourceGroup> changedGroupsForCurrentTemplate = entry.getValue().stream()
.filter(resourceGroupId -> {
ResourceGroupSpec previousSpec = specsUsedToConfigureGroups.get(resourceGroupId.getId());
return previousSpec == null || !previousSpec.sameConfig(newSpec);
})
.collect(toImmutableSet());
for (ResourceGroup group : changedGroupsForCurrentTemplate) {
changedGroups.put(group, newSpec);
}
}
}
return changedGroups.buildOrThrow();
}

private Set<ResourceGroup> findDeletedGroups(
Map<ResourceGroupIdTemplate, Set<ResourceGroup>> templateToGroups,
Map<ResourceGroupIdTemplate, ResourceGroupSpec> newResourceGroupSpecs)
{
return templateToGroups.entrySet().stream()
.filter(entry -> !newResourceGroupSpecs.containsKey(entry.getKey()))
.flatMap(entry -> entry.getValue().stream())
.filter(resourceGroup -> !resourceGroup.isDisabled())
.collect(toImmutableSet());
}

// Populate temporary data structures to build resource group specs and selectors from db
private synchronized void populateFromDbHelper(Map<Long, ResourceGroupSpecBuilder> recordMap,
Set<Long> rootGroupIds,
Expand Down Expand Up @@ -356,23 +390,21 @@ private synchronized Map.Entry<ManagerSpec, Map<ResourceGroupIdTemplate, Resourc
return new AbstractMap.SimpleImmutableEntry<>(managerSpec, resourceGroupSpecs);
}

private synchronized void configureChangedGroups(Set<ResourceGroupIdTemplate> changedSpecs)
private synchronized void configureChangedGroups(Map<ResourceGroup, ResourceGroupSpec> changedGroups)
{
for (ResourceGroupIdTemplate resourceGroupIdTemplate : changedSpecs) {
for (ResourceGroup group : configuredGroups.get(resourceGroupIdTemplate)) {
synchronized (getRootGroup(group.getId())) {
configureGroup(group, resourceGroupSpecs.get(resourceGroupIdTemplate));
}
for (Map.Entry<ResourceGroup, ResourceGroupSpec> entry : changedGroups.entrySet()) {
ResourceGroup group = entry.getKey();
ResourceGroupSpec groupSpec = entry.getValue();
synchronized (getRootGroup(group.getId())) {
configureGroup(group, groupSpec);
}
}
}

private synchronized void disableDeletedGroups(Set<ResourceGroupIdTemplate> deletedSpecs)
private synchronized void disableDeletedGroups(Set<ResourceGroup> deletedGroups)
{
for (ResourceGroupIdTemplate resourceGroupIdTemplate : deletedSpecs) {
for (ResourceGroup group : configuredGroups.get(resourceGroupIdTemplate)) {
group.setDisabled(true);
}
for (ResourceGroup group : deletedGroups) {
group.setDisabled(true);
}
}

Expand Down Expand Up @@ -409,7 +441,7 @@ public void shutdown()
*/
private static class ResourceGroupToTemplateMap
{
private final Map<ResourceGroupId, ResourceGroup> groups = new HashMap<>();
private final Map<ResourceGroupId, ResourceGroup> groups = new ConcurrentHashMap<>();
private final Map<ResourceGroupId, ResourceGroupIdTemplate> groupIdToTemplate = new HashMap<>();
private final Map<ResourceGroupIdTemplate, Set<ResourceGroup>> templateToGroups = new HashMap<>();

Expand All @@ -427,22 +459,19 @@ synchronized void put(ResourceGroupIdTemplate newTemplate, ResourceGroup group)
templateToGroups.get(previousTemplate)
.remove(group);
}
templateToGroups.computeIfAbsent(newTemplate, _ -> ConcurrentHashMap.newKeySet())
templateToGroups.computeIfAbsent(newTemplate, _ -> new HashSet<>())
.add(group);
}

synchronized List<ResourceGroup> get(ResourceGroupIdTemplate idTemplate)
ResourceGroup get(ResourceGroupId groupId)
{
Set<ResourceGroup> groups = templateToGroups.get(idTemplate);
if (groups == null) {
return ImmutableList.of();
}
return ImmutableList.copyOf(groups);
return groups.get(groupId);
}

synchronized ResourceGroup get(ResourceGroupId groupId)
synchronized Map<ResourceGroupIdTemplate, Set<ResourceGroup>> getAllTemplateToGroupsMappings()
{
return groups.get(groupId);
return templateToGroups.entrySet().stream()
.collect(toImmutableMap(Map.Entry::getKey, entry -> ImmutableSet.copyOf(entry.getValue())));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.trino.spi.session.ResourceEstimates;
import org.h2.jdbc.JdbcException;
import org.jdbi.v3.core.statement.UnableToExecuteStatementException;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

Expand All @@ -36,13 +37,16 @@
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Pattern;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.trino.execution.resourcegroups.InternalResourceGroup.DEFAULT_WEIGHT;
import static io.trino.spi.resourcegroups.SchedulingPolicy.FAIR;
import static io.trino.spi.resourcegroups.SchedulingPolicy.WEIGHTED;
import static io.trino.testing.assertions.Assert.assertEventually;
import static io.trino.testing.assertions.TrinoExceptionAssert.assertTrinoExceptionThrownBy;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -379,6 +383,61 @@ public void testMatchByUsersAndGroups()
.isEqualTo(Optional.of(new ResourceGroupIdTemplate("group")));
}

@RepeatedTest(10)
public void testConfigurationUpdateIsNotLost()
{
// This test attempts to reproduce the following sequence:
// 1. Load resource group configuration C1, which includes template T1.
// 2. For query Q1, select template T1 and expand it into resource group R1.
// 3. For query Q1, obtain C1 in DbResourceGroupConfigurationManager#configure.
// 4. Load resource group configuration C2 with modified parameters for T1.
//
// If everything works correctly, C2 should eventually be applied to R1.
// We want to avoid the following scenarios:
// - C1, obtained in step 3, overwrites C2 applied to R1 in step 4, and no subsequent
// 'load' applies C2 again.
// - The 'load' in step 4 doesn't apply C2 to R1 because 'configure' hasn't created a
// mapping between T1 and R1 yet, and no subsequent 'load' detects the configuration change for T1.

H2DaoProvider daoProvider = setup("test_lost_update");
H2ResourceGroupsDao dao = daoProvider.get();
dao.createResourceGroupsGlobalPropertiesTable();
dao.createResourceGroupsTable();
dao.createSelectorsTable();
dao.insertResourceGroup(1, "global", "80%", 10, null, 1, null, null, null, null, null, null, ENVIRONMENT);
dao.insertSelector(1, 1, null, "userGroup", null, null, null, null);
DbResourceGroupConfigurationManager manager = new DbResourceGroupConfigurationManager(_ -> {}, new DbResourceGroupConfig(), daoProvider.get(), ENVIRONMENT);

Optional<SelectionContext<ResourceGroupIdTemplate>> userGroup = manager.match(userGroupsSelectionCriteria("userGroup"));
assertThat(userGroup.isPresent()).isTrue();
SelectionContext<ResourceGroupIdTemplate> selectionContext = userGroup.get();

InternalResourceGroup resourceGroup = new InternalResourceGroup("global", (_, _) -> {}, directExecutor());

try (ExecutorService executor = Executors.newFixedThreadPool(2)) {
dao.updateResourceGroup(1, "global", "80%", 10, null, 10, null, null, null, null, null, null, ENVIRONMENT);

executor.submit(() -> {
synchronized (resourceGroup) {
// Wait while holding the lock to increase the likelihood that 'load' and 'configure'
// will attempt to update the configuration simultaneously. Both need to acquire this
// lock to update the resource group.
Thread.sleep(10);
}
return null;
});

executor.submit(manager::load);

manager.configure(resourceGroup, selectionContext);

assertEventually(() -> {
manager.load();
assertThat(resourceGroup.getHardConcurrencyLimit()).isEqualTo(10);
});
}
}

private static void assertEqualsResourceGroup(
InternalResourceGroup group,
String softMemoryLimit,
Expand Down

0 comments on commit 6030124

Please sign in to comment.