Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-23634 Make hardcoded REBALANCE_RETRY_DELAY_MS configurable #4951

Merged
merged 9 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.configuration.utils;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.ignite.internal.configuration.SystemDistributedConfiguration;
import org.apache.ignite.internal.configuration.SystemDistributedView;
import org.apache.ignite.internal.configuration.SystemPropertyView;

/** Holder of system distributed configuration property with auto-update and support of external listener. */
public class SystemDistributedConfigurationPropertyHolder<T> {
/** Configuration property name. */
private final String propertyName;

/** Default value. */
private final T defaultValue;

/** System distributed configuration. */
private final SystemDistributedConfiguration systemDistributedConfig;

/** Current value of target system distributed configuration property. */
private final AtomicReference<T> currentValue = new AtomicReference<>();

/** Listener, which receives (newValue, revision) on every configuration update. */
private final BiConsumer<T, Long> valueListener;

/** Converter to translate {@link String} representation of property value to target type. */
private final Function<String, T> propertyConverter;

/**
* Constructor.
*
* @param systemDistributedConfig System distributed configuration.
* @param valueListener Listener, which receives (newValue, revision) on every configuration update.
* @param propertyName Configuration property name.
* @param defaultValue Default value.
* @param propertyConverter Converter to translate {@link String} representation of property value to target type.
*/
public SystemDistributedConfigurationPropertyHolder(
SystemDistributedConfiguration systemDistributedConfig,
BiConsumer<T, Long> valueListener,
String propertyName,
T defaultValue,
Function<String, T> propertyConverter
) {
this.systemDistributedConfig = systemDistributedConfig;
this.valueListener = valueListener;
this.propertyName = propertyName;
this.defaultValue = defaultValue;
this.propertyConverter = propertyConverter;

systemDistributedConfig.listen(ctx -> {
updateSystemProperties(ctx.newValue(), ctx.storageRevision());

return CompletableFuture.completedFuture(null);
});
}

/**
* Init property value, but doesn't call the listener.
*
* <p>If this method's call or first configuration update will not occur before holder usage, it will produce a {@code null} value.
*/
public void init() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a bit more details why we need this init and when it must be called

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

updateSystemProperties(systemDistributedConfig.value(), -1);
}

/**
* Returns current value of configuration property.
*
* @return Current value.
*/
public T currentValue() {
return currentValue.get();
}

/**
* Update current value and call listener (if revision != -1).
*
* @param view System distributed view.
* @param revision Metastorage revision.
*/
private void updateSystemProperties(SystemDistributedView view, long revision) {
SystemPropertyView systemPropertyView = view.properties().get(propertyName);

T value = (systemPropertyView == null) ? defaultValue : propertyConverter.apply(systemPropertyView.propertyValue());

currentValue.set(value);

if (revision != -1) {
valueListener.accept(value, revision);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.ignite.internal.distributionzones.configuration;
package org.apache.ignite.internal.configuration.utils;

import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
Expand All @@ -27,74 +27,88 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.ignite.internal.configuration.SystemDistributedConfiguration;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

/** Tests for {@link DistributionZonesHighAvailabilityConfiguration}. */
/** Tests for {@link SystemDistributedConfigurationPropertyHolder}. */
@ExtendWith(ConfigurationExtension.class)
public class DistributionZonesHighAvailabilityConfigurationTest extends BaseIgniteAbstractTest {
private static final String PARTITION_DISTRIBUTION_RESET_TIMEOUT = "partitionDistributionResetTimeout";
public class SystemDistributedConfigurationPropertyHolderTest extends BaseIgniteAbstractTest {
private static final String PROPERTY_NAME = "distributedPropertyName";

private static final long PARTITION_DISTRIBUTION_RESET_TIMEOUT_DEFAULT_VALUE = 0;
private static final String DEFAULT_VALUE = "defaultValue";

private static final BiConsumer<Integer, Long> noOpConsumer = (partitionDistributionResetTimeout, revision) -> {};
private static final BiConsumer<String, Long> noOpConsumer = (value, revision) -> {};

@Test
void testEmptySystemProperties(@InjectConfiguration SystemDistributedConfiguration systemConfig) {
var config = new DistributionZonesHighAvailabilityConfiguration(systemConfig, noOpConsumer);
config.startAndInit();
var config = new SystemDistributedConfigurationPropertyHolder<>(
systemConfig,
noOpConsumer,
PROPERTY_NAME,
DEFAULT_VALUE,
Function.identity()
);
config.init();

assertEquals(PARTITION_DISTRIBUTION_RESET_TIMEOUT_DEFAULT_VALUE, config.partitionDistributionResetTimeoutSeconds());
assertEquals(DEFAULT_VALUE, config.currentValue());
}

@Test
void testValidSystemPropertiesOnStart(
@InjectConfiguration("mock.properties = {"
+ PARTITION_DISTRIBUTION_RESET_TIMEOUT + ".propertyValue = \"5\"}")
+ PROPERTY_NAME + ".propertyValue = \"newValue\"}")
SystemDistributedConfiguration systemConfig
) {
var config = new DistributionZonesHighAvailabilityConfiguration(systemConfig, noOpConsumer);
config.startAndInit();
var config = noopConfigHolder(systemConfig);

config.init();

assertEquals(5, config.partitionDistributionResetTimeoutSeconds());
assertEquals("newValue", config.currentValue());
}

@Test
void testValidSystemPropertiesOnChange(@InjectConfiguration SystemDistributedConfiguration systemConfig) {
var config = new DistributionZonesHighAvailabilityConfiguration(systemConfig, noOpConsumer);
config.startAndInit();
var config = noopConfigHolder(systemConfig);

changeSystemConfig(systemConfig, "10");
config.init();

changeSystemConfig(systemConfig, "newValue");

assertEquals(10, config.partitionDistributionResetTimeoutSeconds());
assertEquals("newValue", config.currentValue());
}

@Test
void testUpdateConfigListener(@InjectConfiguration SystemDistributedConfiguration systemConfig) throws InterruptedException {
AtomicReference<Integer> partitionDistributionResetTimeoutValue = new AtomicReference<>();
void testUpdateConfigListenerWithConverter(
@InjectConfiguration SystemDistributedConfiguration systemConfig
) throws InterruptedException {
AtomicReference<Integer> currentValue = new AtomicReference<>();
AtomicReference<Long> revisionValue = new AtomicReference<>();

var config = new DistributionZonesHighAvailabilityConfiguration(
var config = new SystemDistributedConfigurationPropertyHolder<>(
systemConfig,
(partitionDistributionResetTimeout, revision) -> {
partitionDistributionResetTimeoutValue.set(partitionDistributionResetTimeout);
(v, revision) -> {
currentValue.set(v);
revisionValue.set(revision);
}
},
PROPERTY_NAME,
0,
Integer::parseInt
);
config.startAndInit();
config.init();

assertNotEquals(10, partitionDistributionResetTimeoutValue.get());
assertNotEquals(10, currentValue.get());
assertNotEquals(1, revisionValue.get());

changeSystemConfig(systemConfig, "10");

assertTrue(waitForCondition(() ->
partitionDistributionResetTimeoutValue.get() != null
&& partitionDistributionResetTimeoutValue.get() == 10, 1_000));
currentValue.get() != null
&& currentValue.get().equals(10), 1_000));
assertEquals(1, revisionValue.get());
}

Expand All @@ -103,9 +117,19 @@ private static void changeSystemConfig(
String partitionDistributionReset
) {
CompletableFuture<Void> changeFuture = systemConfig.change(c0 -> c0.changeProperties()
.create(PARTITION_DISTRIBUTION_RESET_TIMEOUT, c1 -> c1.changePropertyValue(partitionDistributionReset))
.create(PROPERTY_NAME, c1 -> c1.changePropertyValue(partitionDistributionReset))
);

assertThat(changeFuture, willCompleteSuccessfully());
}

private static SystemDistributedConfigurationPropertyHolder<String> noopConfigHolder(SystemDistributedConfiguration systemConfig) {
return new SystemDistributedConfigurationPropertyHolder<>(
systemConfig,
noOpConsumer,
PROPERTY_NAME,
DEFAULT_VALUE,
Function.identity()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import static org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_ALTER;
import static org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_CREATE;
import static org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_DROP;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.PARTITION_DISTRIBUTION_RESET_TIMEOUT;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.PARTITION_DISTRIBUTION_RESET_TIMEOUT_DEFAULT_VALUE;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.conditionForRecoverableStateChanges;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.conditionForZoneCreation;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.conditionForZoneRemoval;
Expand Down Expand Up @@ -98,8 +100,8 @@
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.configuration.SystemDistributedConfiguration;
import org.apache.ignite.internal.configuration.utils.SystemDistributedConfigurationPropertyHolder;
import org.apache.ignite.internal.distributionzones.causalitydatanodes.CausalityDataNodesEngine;
import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesHighAvailabilityConfiguration;
import org.apache.ignite.internal.distributionzones.events.HaZoneTopologyUpdateEvent;
import org.apache.ignite.internal.distributionzones.events.HaZoneTopologyUpdateEventParams;
import org.apache.ignite.internal.distributionzones.exception.DistributionZoneNotFoundException;
Expand Down Expand Up @@ -210,7 +212,7 @@ public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
private final ScheduledExecutorService rebalanceScheduler;

/** Configuration of HA mode. */
private final DistributionZonesHighAvailabilityConfiguration configuration;
private final SystemDistributedConfigurationPropertyHolder<Integer> partitionDistributionResetTimeoutConfiguration;

/**
* Creates a new distribution zone manager.
Expand Down Expand Up @@ -264,9 +266,12 @@ public DistributionZoneManager(
catalogManager
);

configuration = new DistributionZonesHighAvailabilityConfiguration(
partitionDistributionResetTimeoutConfiguration = new SystemDistributedConfigurationPropertyHolder<>(
systemDistributedConfiguration,
this::onUpdatePartitionDistributionResetBusy
this::onUpdatePartitionDistributionResetBusy,
PARTITION_DISTRIBUTION_RESET_TIMEOUT,
PARTITION_DISTRIBUTION_RESET_TIMEOUT_DEFAULT_VALUE,
Integer::parseInt
);
}

Expand Down Expand Up @@ -296,7 +301,7 @@ public CompletableFuture<Void> startAsync(ComponentContext componentContext) {
// fires CatalogManager's ZONE_CREATE event, and the state of DistributionZoneManager becomes consistent.
int catalogVersion = catalogManager.latestCatalogVersion();

configuration.start();
partitionDistributionResetTimeoutConfiguration.init();

return allOf(
createOrRestoreZonesStates(recoveryRevision, catalogVersion),
Expand Down Expand Up @@ -944,7 +949,7 @@ private CompletableFuture<Void> scheduleTimers(CatalogZoneDescriptor zone, boole
int autoAdjust = zone.dataNodesAutoAdjust();
int autoAdjustScaleDown = zone.dataNodesAutoAdjustScaleDown();
int autoAdjustScaleUp = zone.dataNodesAutoAdjustScaleUp();
int partitionReset = configuration.partitionDistributionResetTimeoutSeconds();
int partitionReset = partitionDistributionResetTimeoutConfiguration.currentValue();

int zoneId = zone.id();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,26 @@ public class DistributionZonesUtil {
private static final ByteArray DISTRIBUTION_ZONES_DATA_NODES_KEY =
new ByteArray(DISTRIBUTION_ZONE_DATA_NODES_PREFIX);

/**
* Internal property that determines partition group members reset timeout after the partition group majority loss.
*
* <p>Default value is {@link #PARTITION_DISTRIBUTION_RESET_TIMEOUT_DEFAULT_VALUE}.</p>
*/
public static final String PARTITION_DISTRIBUTION_RESET_TIMEOUT = "partitionDistributionResetTimeout";

/** Default value for the {@link #PARTITION_DISTRIBUTION_RESET_TIMEOUT}. */
static final int PARTITION_DISTRIBUTION_RESET_TIMEOUT_DEFAULT_VALUE = 0;

/**
* Internal property that determines delay between unsuccessful trial of a rebalance and a new trial, ms.
*
* <p>Default value is {@link #REBALANCE_RETRY_DELAY_DEFAULT}.</p>
*/
public static final String REBALANCE_RETRY_DELAY_MS = "rebalanceRetryDelay";

/** Default value for the {@link #REBALANCE_RETRY_DELAY_MS}. */
public static final int REBALANCE_RETRY_DELAY_DEFAULT = 200;

/**
* ByteArray representation of {@link DistributionZonesUtil#DISTRIBUTION_ZONE_DATA_NODES_VALUE_PREFIX}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.ignite.internal.distributionzones.configuration;

import static org.apache.ignite.internal.distributionzones.configuration.DistributionZonesHighAvailabilityConfiguration.PARTITION_DISTRIBUTION_RESET_TIMEOUT;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.PARTITION_DISTRIBUTION_RESET_TIMEOUT;
import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.REBALANCE_RETRY_DELAY_MS;

import com.google.auto.service.AutoService;
import java.util.Set;
Expand All @@ -37,7 +38,8 @@ public ConfigurationType type() {
@Override
public Set<Validator<?, ?>> validators() {
return Set.of(
new NonNegativeIntegerNumberSystemPropertyValueValidator(PARTITION_DISTRIBUTION_RESET_TIMEOUT)
new NonNegativeIntegerNumberSystemPropertyValueValidator(PARTITION_DISTRIBUTION_RESET_TIMEOUT),
new NonNegativeIntegerNumberSystemPropertyValueValidator(REBALANCE_RETRY_DELAY_MS)
);
}
}
Loading