From 807ccef8b60e6be5b6d312e9ea06a38c7de1b356 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Wed, 19 Apr 2023 00:01:49 +0530 Subject: [PATCH] Register settings and add unit tests Signed-off-by: Ashish Singh --- .../common/settings/ClusterSettings.java | 13 +- ...oteRefreshSegmentPressureSettingsTest.java | 268 ++++++++++++++++++ 2 files changed, 280 insertions(+), 1 deletion(-) create mode 100644 server/src/test/java/org/opensearch/index/RemoteRefreshSegmentPressureSettingsTest.java diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index de3fd1c9380b9..4c3272e53379c 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -39,6 +39,7 @@ import org.opensearch.index.IndexModule; import org.opensearch.index.IndexSettings; import org.opensearch.index.IndexingPressure; +import org.opensearch.index.RemoteRefreshSegmentPressureSettings; import org.opensearch.index.SegmentReplicationPressureService; import org.opensearch.index.ShardIndexingPressureMemoryManager; import org.opensearch.index.ShardIndexingPressureSettings; @@ -635,7 +636,17 @@ public void apply(Settings value, Settings current, Settings previous) { SegmentReplicationPressureService.MAX_ALLOWED_STALE_SHARDS, // Settings related to Searchable Snapshots - Node.NODE_SEARCH_CACHE_SIZE_SETTING + Node.NODE_SEARCH_CACHE_SIZE_SETTING, + + // Settings related to Remote Refresh Segment Pressure + RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED, + RemoteRefreshSegmentPressureSettings.MIN_SEQ_NO_LAG_LIMIT, + RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_THRESHOLD, + RemoteRefreshSegmentPressureSettings.TIME_LAG_VARIANCE_THRESHOLD, + RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT, + RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE, + RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE, + RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE ) ) ); diff --git a/server/src/test/java/org/opensearch/index/RemoteRefreshSegmentPressureSettingsTest.java b/server/src/test/java/org/opensearch/index/RemoteRefreshSegmentPressureSettingsTest.java new file mode 100644 index 0000000000000..2f58403584a27 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/RemoteRefreshSegmentPressureSettingsTest.java @@ -0,0 +1,268 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; + +public class RemoteRefreshSegmentPressureSettingsTest extends OpenSearchTestCase { + + private ClusterService clusterService; + + private ThreadPool threadPool; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("remote_refresh_segment_pressure_settings_test"); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testGetDefaultSettings() { + RemoteRefreshSegmentPressureSettings pressureSettings = new RemoteRefreshSegmentPressureSettings( + clusterService, + Settings.EMPTY, + mock(RemoteRefreshSegmentPressureService.class) + ); + + // Check remote refresh segment pressure enabled is false + assertFalse(pressureSettings.isRemoteRefreshSegmentPressureEnabled()); + + // Check min sequence number lag limit default value + assertEquals(5L, pressureSettings.getMinSeqNoLagLimit()); + + // Check bytes lag variance threshold default value + assertEquals(2.0, pressureSettings.getBytesLagVarianceThreshold(), 0.0d); + + // Check time lag variance threshold default value + assertEquals(2.0, pressureSettings.getTimeLagVarianceThreshold(), 0.0d); + + // Check minimum consecutive failures limit default value + assertEquals(10, pressureSettings.getMinConsecutiveFailuresLimit()); + + // Check upload bytes moving average window size default value + assertEquals(20, pressureSettings.getUploadBytesMovingAverageWindowSize()); + + // Check upload bytes per sec moving average window size default value + assertEquals(20, pressureSettings.getUploadBytesPerSecMovingAverageWindowSize()); + + // Check upload time moving average window size default value + assertEquals(20, pressureSettings.getUploadTimeMovingAverageWindowSize()); + } + + public void testGetConfiguredSettings() { + Settings settings = Settings.builder() + .put(RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true) + .put(RemoteRefreshSegmentPressureSettings.MIN_SEQ_NO_LAG_LIMIT.getKey(), 100) + .put(RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_THRESHOLD.getKey(), 50.0) + .put(RemoteRefreshSegmentPressureSettings.TIME_LAG_VARIANCE_THRESHOLD.getKey(), 60.0) + .put(RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 102) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 103) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 104) + .build(); + RemoteRefreshSegmentPressureSettings pressureSettings = new RemoteRefreshSegmentPressureSettings( + clusterService, + settings, + mock(RemoteRefreshSegmentPressureService.class) + ); + + // Check remote refresh segment pressure enabled is true + assertTrue(pressureSettings.isRemoteRefreshSegmentPressureEnabled()); + + // Check min sequence number lag limit configured value + assertEquals(100L, pressureSettings.getMinSeqNoLagLimit()); + + // Check bytes lag variance threshold configured value + assertEquals(50.0, pressureSettings.getBytesLagVarianceThreshold(), 0.0d); + + // Check time lag variance threshold configured value + assertEquals(60.0, pressureSettings.getTimeLagVarianceThreshold(), 0.0d); + + // Check minimum consecutive failures limit configured value + assertEquals(121, pressureSettings.getMinConsecutiveFailuresLimit()); + + // Check upload bytes moving average window size configured value + assertEquals(102, pressureSettings.getUploadBytesMovingAverageWindowSize()); + + // Check upload bytes per sec moving average window size configured value + assertEquals(103, pressureSettings.getUploadBytesPerSecMovingAverageWindowSize()); + + // Check upload time moving average window size configured value + assertEquals(104, pressureSettings.getUploadTimeMovingAverageWindowSize()); + } + + public void testUpdateAfterGetDefaultSettings() { + RemoteRefreshSegmentPressureSettings pressureSettings = new RemoteRefreshSegmentPressureSettings( + clusterService, + Settings.EMPTY, + mock(RemoteRefreshSegmentPressureService.class) + ); + + Settings newSettings = Settings.builder() + .put(RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true) + .put(RemoteRefreshSegmentPressureSettings.MIN_SEQ_NO_LAG_LIMIT.getKey(), 100) + .put(RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_THRESHOLD.getKey(), 50.0) + .put(RemoteRefreshSegmentPressureSettings.TIME_LAG_VARIANCE_THRESHOLD.getKey(), 60.0) + .put(RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 102) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 103) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 104) + .build(); + clusterService.getClusterSettings().applySettings(newSettings); + + // Check updated remote refresh segment pressure enabled is false + assertTrue(pressureSettings.isRemoteRefreshSegmentPressureEnabled()); + + // Check min sequence number lag limit + assertEquals(100L, pressureSettings.getMinSeqNoLagLimit()); + + // Check bytes lag variance threshold updated + assertEquals(50.0, pressureSettings.getBytesLagVarianceThreshold(), 0.0d); + + // Check time lag variance threshold updated + assertEquals(60.0, pressureSettings.getTimeLagVarianceThreshold(), 0.0d); + + // Check minimum consecutive failures limit updated + assertEquals(121, pressureSettings.getMinConsecutiveFailuresLimit()); + + // Check upload bytes moving average window size updated + assertEquals(102, pressureSettings.getUploadBytesMovingAverageWindowSize()); + + // Check upload bytes per sec moving average window size updated + assertEquals(103, pressureSettings.getUploadBytesPerSecMovingAverageWindowSize()); + + // Check upload time moving average window size updated + assertEquals(104, pressureSettings.getUploadTimeMovingAverageWindowSize()); + } + + public void testUpdateAfterGetConfiguredSettings() { + Settings settings = Settings.builder() + .put(RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true) + .put(RemoteRefreshSegmentPressureSettings.MIN_SEQ_NO_LAG_LIMIT.getKey(), 100) + .put(RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_THRESHOLD.getKey(), 50.0) + .put(RemoteRefreshSegmentPressureSettings.TIME_LAG_VARIANCE_THRESHOLD.getKey(), 60.0) + .put(RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 121) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 102) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 103) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 104) + .build(); + RemoteRefreshSegmentPressureSettings pressureSettings = new RemoteRefreshSegmentPressureSettings( + clusterService, + settings, + mock(RemoteRefreshSegmentPressureService.class) + ); + + Settings newSettings = Settings.builder() + .put(RemoteRefreshSegmentPressureSettings.MIN_SEQ_NO_LAG_LIMIT.getKey(), 80) + .put(RemoteRefreshSegmentPressureSettings.BYTES_LAG_VARIANCE_THRESHOLD.getKey(), 40.0) + .put(RemoteRefreshSegmentPressureSettings.TIME_LAG_VARIANCE_THRESHOLD.getKey(), 50.0) + .put(RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 111) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 112) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 113) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), 114) + .build(); + + clusterService.getClusterSettings().applySettings(newSettings); + + // Check updated remote refresh segment pressure enabled is true + assertTrue(pressureSettings.isRemoteRefreshSegmentPressureEnabled()); + + // Check min sequence number lag limit + assertEquals(80L, pressureSettings.getMinSeqNoLagLimit()); + + // Check bytes lag variance threshold updated + assertEquals(40.0, pressureSettings.getBytesLagVarianceThreshold(), 0.0d); + + // Check time lag variance threshold updated + assertEquals(50.0, pressureSettings.getTimeLagVarianceThreshold(), 0.0d); + + // Check minimum consecutive failures limit updated + assertEquals(111, pressureSettings.getMinConsecutiveFailuresLimit()); + + // Check upload bytes moving average window size updated + assertEquals(112, pressureSettings.getUploadBytesMovingAverageWindowSize()); + + // Check upload bytes per sec moving average window size updated + assertEquals(113, pressureSettings.getUploadBytesPerSecMovingAverageWindowSize()); + + // Check upload time moving average window size updated + assertEquals(114, pressureSettings.getUploadTimeMovingAverageWindowSize()); + } + + public void testUpdateTriggeredInRemotePressureServiceOnUpdateSettings() { + + int toUpdateVal1 = 1121, toUpdateVal2 = 1123, toUpdateVal3 = 1125; + + AtomicInteger updatedUploadBytesWindowSize = new AtomicInteger(); + AtomicInteger updatedUploadBytesPerSecWindowSize = new AtomicInteger(); + AtomicInteger updatedUploadTimeWindowSize = new AtomicInteger(); + + + RemoteRefreshSegmentPressureService pressureService = mock(RemoteRefreshSegmentPressureService.class); + + // Upload bytes + doAnswer(invocation -> { + updatedUploadBytesWindowSize.set(invocation.getArgument(0)); + return null; + }).when(pressureService).updateUploadBytesMovingAverageWindowSize(anyInt()); + + // Upload bytes per sec + doAnswer(invocation -> { + updatedUploadBytesPerSecWindowSize.set(invocation.getArgument(0)); + return null; + }).when(pressureService).updateUploadBytesPerSecMovingAverageWindowSize(anyInt()); + + // Upload time + doAnswer(invocation -> { + updatedUploadTimeWindowSize.set(invocation.getArgument(0)); + return null; + }).when(pressureService).updateUploadTimeMsMovingAverageWindowSize(anyInt()); + + RemoteRefreshSegmentPressureSettings pressureSettings = new RemoteRefreshSegmentPressureSettings( + clusterService, + Settings.EMPTY, + pressureService + ); + Settings newSettings = Settings.builder() + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal1) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal2) + .put(RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE.getKey(), toUpdateVal3) + .build(); + clusterService.getClusterSettings().applySettings(newSettings); + + // Assertions + assertEquals(toUpdateVal1, pressureSettings.getUploadBytesMovingAverageWindowSize()); + assertEquals(toUpdateVal1, updatedUploadBytesWindowSize.get()); + assertEquals(toUpdateVal2, pressureSettings.getUploadBytesPerSecMovingAverageWindowSize()); + assertEquals(toUpdateVal2, updatedUploadBytesPerSecWindowSize.get()); + assertEquals(toUpdateVal3, pressureSettings.getUploadTimeMovingAverageWindowSize()); + assertEquals(toUpdateVal3, updatedUploadTimeWindowSize.get()); + } +}