diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index 428b4ef15a043..8d718c9ff6006 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -59,6 +59,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.ToXContent; import org.opensearch.common.xcontent.ToXContentFragment; import org.opensearch.common.xcontent.XContentBuilder; @@ -301,6 +302,8 @@ public Iterator> settings() { public static final String SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY = "index.remote_store.translog.repository"; + public static final String SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL = "index.remote_store.translog.buffer_interval"; + /** * Used to specify if the index data should be persisted in the remote store. */ @@ -446,6 +449,45 @@ public Iterator> settings() { Property.Final ); + public static final Setting INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING = Setting.timeSetting( + SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL, + TimeValue.timeValueMillis(100), + TimeValue.timeValueMillis(50), + new Setting.Validator<>() { + + @Override + public void validate(final TimeValue value) {} + + @Override + public void validate(final TimeValue value, final Map, Object> settings) { + if (value == null) { + throw new IllegalArgumentException( + "Setting " + SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL + " should be provided with a valid time value" + ); + } else { + final Boolean isRemoteTranslogStoreEnabled = (Boolean) settings.get(INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING); + if (isRemoteTranslogStoreEnabled == null || isRemoteTranslogStoreEnabled == false) { + throw new IllegalArgumentException( + "Setting " + + SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL + + " can only be set when " + + SETTING_REMOTE_TRANSLOG_STORE_ENABLED + + " is set to true" + ); + } + } + } + + @Override + public Iterator> settings() { + final List> settings = Collections.singletonList(INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING); + return settings.iterator(); + } + }, + Property.IndexScope, + Property.Final + ); + public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas"; public static final Setting INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING; diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 1efce2eba8867..a81c330177129 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -227,7 +227,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING, IndexMetadata.INDEX_REMOTE_STORE_REPOSITORY_SETTING, IndexMetadata.INDEX_REMOTE_TRANSLOG_STORE_ENABLED_SETTING, - IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING + IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING, + IndexMetadata.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING ), FeatureFlags.SEARCHABLE_SNAPSHOT, List.of( diff --git a/server/src/main/java/org/opensearch/common/settings/Setting.java b/server/src/main/java/org/opensearch/common/settings/Setting.java index f86fe6771dfcd..176f24e3cc1ff 100644 --- a/server/src/main/java/org/opensearch/common/settings/Setting.java +++ b/server/src/main/java/org/opensearch/common/settings/Setting.java @@ -2173,6 +2173,23 @@ public static Setting timeSetting(String key, Setting fall return new Setting<>(key, fallbackSetting, (s) -> TimeValue.parseTimeValue(s, key), properties); } + public static Setting timeSetting( + String key, + TimeValue defaultValue, + TimeValue minValue, + Validator validator, + Property... properties + ) { + final SimpleKey simpleKey = new SimpleKey(key); + return new Setting<>( + simpleKey, + s -> defaultValue.getStringRep(), + minTimeValueParser(key, minValue, isFiltered(properties)), + validator, + properties + ); + } + public static Setting timeSetting( String key, Setting fallBackSetting, diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java b/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java index 72cc0f5ee21d2..e9b9442c555e5 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/AsyncIOProcessor.java @@ -57,6 +57,7 @@ public abstract class AsyncIOProcessor { private final ArrayBlockingQueue>> queue; private final ThreadContext threadContext; private final Semaphore promiseSemaphore = new Semaphore(1); + private long lastRunStartTimeInNs; protected AsyncIOProcessor(Logger logger, int queueSize, ThreadContext threadContext) { this.logger = logger; @@ -67,7 +68,7 @@ protected AsyncIOProcessor(Logger logger, int queueSize, ThreadContext threadCon /** * Adds the given item to the queue. The listener is notified once the item is processed */ - public final void put(Item item, Consumer listener) { + public void put(Item item, Consumer listener) { Objects.requireNonNull(item, "item must not be null"); Objects.requireNonNull(listener, "listener must not be null"); // the algorithm here tires to reduce the load on each individual caller. @@ -78,12 +79,7 @@ public final void put(Item item, Consumer listener) { final boolean promised = promiseSemaphore.tryAcquire(); if (promised == false) { // in this case we are not responsible and can just block until there is space - try { - queue.put(new Tuple<>(item, preserveContext(listener))); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - listener.accept(e); - } + addToQueue(item, listener); } // here we have to try to make the promise again otherwise there is a race when a thread puts an entry without making the promise @@ -104,7 +100,17 @@ public final void put(Item item, Consumer listener) { } } - private void drainAndProcessAndRelease(List>> candidates) { + void addToQueue(Item item, Consumer listener) { + try { + queue.put(new Tuple<>(item, preserveContext(listener))); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + listener.accept(e); + } + } + + void drainAndProcessAndRelease(List>> candidates) { + lastRunStartTimeInNs = System.nanoTime(); Exception exception; try { queue.drainTo(candidates); @@ -130,7 +136,7 @@ private Exception processList(List>> candidates) return exception; } - private void notifyList(List>> candidates, Exception exception) { + void notifyList(List>> candidates, Exception exception) { for (Tuple> tuple : candidates) { Consumer consumer = tuple.v2(); try { @@ -141,7 +147,7 @@ private void notifyList(List>> candidates, Excep } } - private Consumer preserveContext(Consumer consumer) { + Consumer preserveContext(Consumer consumer) { Supplier restorableContext = threadContext.newRestorableContext(false); return e -> { try (ThreadContext.StoredContext ignore = restorableContext.get()) { @@ -154,4 +160,20 @@ private Consumer preserveContext(Consumer consumer) { * Writes or processes the items out or to disk. */ protected abstract void write(List>> candidates) throws IOException; + + Logger getLogger() { + return logger; + } + + Semaphore getPromiseSemaphore() { + return promiseSemaphore; + } + + long getLastRunStartTimeInNs() { + return lastRunStartTimeInNs; + } + + ArrayBlockingQueue>> getQueue() { + return queue; + } } diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java b/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java new file mode 100644 index 0000000000000..e3157ea8dfdb4 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java @@ -0,0 +1,93 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.util.concurrent; + +import org.apache.logging.log4j.Logger; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.function.Consumer; + +/** + * A variant of {@link AsyncIOProcessor} that allows to batch and buffer processing items at every + * {@link BufferedAsyncIOProcessor#bufferInterval} in a separate threadpool. + *

+ * Requests are buffered till processor thread calls @{link drainAndProcessAndRelease} after bufferInterval. + * If more requests are enqueued between invocations of drainAndProcessAndRelease, another processor thread + * gets scheduled. Subsequent requests will get buffered till drainAndProcessAndRelease gets called in this new + * processor thread. + * + * @opensearch.internal + */ +public abstract class BufferedAsyncIOProcessor extends AsyncIOProcessor { + + private final ThreadPool threadpool; + private final TimeValue bufferInterval; + + protected BufferedAsyncIOProcessor( + Logger logger, + int queueSize, + ThreadContext threadContext, + ThreadPool threadpool, + TimeValue bufferInterval + ) { + super(logger, queueSize, threadContext); + this.threadpool = threadpool; + this.bufferInterval = bufferInterval; + } + + @Override + public void put(Item item, Consumer listener) { + Objects.requireNonNull(item, "item must not be null"); + Objects.requireNonNull(listener, "listener must not be null"); + addToQueue(item, listener); + scheduleProcess(); + } + + private void scheduleProcess() { + if (getQueue().isEmpty() == false && getPromiseSemaphore().tryAcquire()) { + try { + threadpool.schedule(this::process, getBufferInterval(), getBufferRefreshThreadPoolName()); + } catch (Exception e) { + getLogger().error("failed to schedule process"); + processSchedulingFailure(e); + getPromiseSemaphore().release(); + // This is to make sure that any new items that are added to the queue between processSchedulingFailure + // and releasing the semaphore is handled by a subsequent refresh and not starved. + scheduleProcess(); + } + } + } + + private void processSchedulingFailure(Exception e) { + List>> candidates = new ArrayList<>(); + getQueue().drainTo(candidates); + notifyList(candidates, e); + } + + private void process() { + drainAndProcessAndRelease(new ArrayList<>()); + scheduleProcess(); + } + + private TimeValue getBufferInterval() { + long timeSinceLastRunStartInNS = System.nanoTime() - getLastRunStartTimeInNs(); + if (timeSinceLastRunStartInNS >= bufferInterval.getNanos()) { + return TimeValue.ZERO; + } + return TimeValue.timeValueNanos(bufferInterval.getNanos() - timeSinceLastRunStartInNS); + } + + protected abstract String getBufferRefreshThreadPoolName(); + +} diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 3a4c60167754f..4791a23d2ecb5 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -587,6 +587,7 @@ public final class IndexSettings { private final ReplicationType replicationType; private final boolean isRemoteStoreEnabled; private final boolean isRemoteTranslogStoreEnabled; + private final TimeValue remoteTranslogUploadBufferInterval; private final String remoteStoreTranslogRepository; private final String remoteStoreRepository; private final boolean isRemoteSnapshot; @@ -753,6 +754,10 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti isRemoteStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false); isRemoteTranslogStoreEnabled = settings.getAsBoolean(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false); remoteStoreTranslogRepository = settings.get(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY); + remoteTranslogUploadBufferInterval = settings.getAsTime( + IndexMetadata.SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL, + TimeValue.timeValueMillis(100) + ); remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY); isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings); @@ -1134,6 +1139,15 @@ public void setTranslogSyncInterval(TimeValue translogSyncInterval) { this.syncInterval = translogSyncInterval; } + /** + * Returns the translog sync/upload buffer interval when remote translog store is enabled and index setting + * {@code index.translog.durability} is set as {@code request}. + * @return the buffer interval. + */ + public TimeValue getRemoteTranslogUploadBufferInterval() { + return remoteTranslogUploadBufferInterval; + } + /** * Returns this interval in which the shards of this index are asynchronously refreshed. {@code -1} means async refresh is disabled. */ diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index ea121659f95e0..28a7f0893ce2a 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -95,6 +95,7 @@ import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.AsyncIOProcessor; +import org.opensearch.common.util.concurrent.BufferedAsyncIOProcessor; import org.opensearch.common.util.concurrent.RunOnce; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.util.set.Sets; @@ -365,7 +366,13 @@ public IndexShard( this.indexSortSupplier = indexSortSupplier; this.indexEventListener = indexEventListener; this.threadPool = threadPool; - this.translogSyncProcessor = createTranslogSyncProcessor(logger, threadPool.getThreadContext(), this::getEngine); + this.translogSyncProcessor = createTranslogSyncProcessor( + logger, + threadPool, + this::getEngine, + indexSettings.isRemoteTranslogStoreEnabled(), + indexSettings.getRemoteTranslogUploadBufferInterval() + ); this.mapperService = mapperService; this.indexCache = indexCache; this.internalIndexingStats = new InternalIndexingStats(); @@ -3822,21 +3829,41 @@ public List getActiveOperations() { private static AsyncIOProcessor createTranslogSyncProcessor( Logger logger, - ThreadContext threadContext, - Supplier engineSupplier + ThreadPool threadPool, + Supplier engineSupplier, + boolean bufferAsyncIoProcessor, + TimeValue bufferInterval ) { - return new AsyncIOProcessor(logger, 1024, threadContext) { + ThreadContext threadContext = threadPool.getThreadContext(); + CheckedConsumer>>, IOException> writeConsumer = candidates -> { + try { + engineSupplier.get().ensureTranslogSynced(candidates.stream().map(Tuple::v1)); + } catch (AlreadyClosedException ex) { + // that's fine since we already synced everything on engine close - this also is conform with the methods + // documentation + } catch (IOException ex) { // if this fails we are in deep shit - fail the request + logger.debug("failed to sync translog", ex); + throw ex; + } + }; + + if (bufferAsyncIoProcessor) { + return new BufferedAsyncIOProcessor<>(logger, 102400, threadContext, threadPool, bufferInterval) { + @Override + protected void write(List>> candidates) throws IOException { + writeConsumer.accept(candidates); + } + + @Override + protected String getBufferRefreshThreadPoolName() { + return ThreadPool.Names.TRANSLOG_SYNC; + } + }; + } + return new AsyncIOProcessor<>(logger, 1024, threadContext) { @Override protected void write(List>> candidates) throws IOException { - try { - engineSupplier.get().ensureTranslogSynced(candidates.stream().map(Tuple::v1)); - } catch (AlreadyClosedException ex) { - // that's fine since we already synced everything on engine close - this also is conform with the methods - // documentation - } catch (IOException ex) { // if this fails we are in deep shit - fail the request - logger.debug("failed to sync translog", ex); - throw ex; - } + writeConsumer.accept(candidates); } }; } diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index 3cc73c864a5c9..710e5275645d4 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -108,6 +108,7 @@ public static class Names { public static final String SYSTEM_READ = "system_read"; public static final String SYSTEM_WRITE = "system_write"; public static final String TRANSLOG_TRANSFER = "translog_transfer"; + public static final String TRANSLOG_SYNC = "translog_sync"; } /** @@ -173,6 +174,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.SYSTEM_READ, ThreadPoolType.FIXED); map.put(Names.SYSTEM_WRITE, ThreadPoolType.FIXED); map.put(Names.TRANSLOG_TRANSFER, ThreadPoolType.SCALING); + map.put(Names.TRANSLOG_SYNC, ThreadPoolType.FIXED); THREAD_POOL_TYPES = Collections.unmodifiableMap(map); } @@ -261,6 +263,7 @@ public ThreadPool( Names.TRANSLOG_TRANSFER, new ScalingExecutorBuilder(Names.TRANSLOG_TRANSFER, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)) ); + builders.put(Names.TRANSLOG_SYNC, new FixedExecutorBuilder(settings, Names.TRANSLOG_SYNC, allocatedProcessors * 4, 10000)); for (final ExecutorBuilder builder : customBuilders) { if (builders.containsKey(builder.name())) { diff --git a/server/src/test/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessorTests.java b/server/src/test/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessorTests.java new file mode 100644 index 0000000000000..3ec7bdc54d3cb --- /dev/null +++ b/server/src/test/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessorTests.java @@ -0,0 +1,252 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.util.concurrent; + +import org.junit.After; +import org.junit.Before; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static java.lang.Thread.sleep; + +public class BufferedAsyncIOProcessorTests extends OpenSearchTestCase { + + private ThreadPool threadpool; + private ThreadContext threadContext; + + @Before + public void setUp() throws Exception { + super.setUp(); + threadpool = new TestThreadPool("buffered-async-io"); + threadContext = new ThreadContext(Settings.EMPTY); + } + + @After + public void cleanup() { + terminate(threadpool); + } + + public void testConsumerCanThrowExceptions() { + AtomicInteger received = new AtomicInteger(0); + AtomicInteger notified = new AtomicInteger(0); + + AsyncIOProcessor processor = new BufferedAsyncIOProcessor<>( + logger, + scaledRandomIntBetween(1, 2024), + threadContext, + threadpool, + TimeValue.timeValueMillis(50) + ) { + @Override + protected void write(List>> candidates) throws IOException { + received.addAndGet(candidates.size()); + } + + @Override + protected String getBufferRefreshThreadPoolName() { + return ThreadPool.Names.TRANSLOG_SYNC; + } + }; + processor.put(new Object(), (e) -> { + notified.incrementAndGet(); + throw new RuntimeException(); + }); + processor.put(new Object(), (e) -> { + notified.incrementAndGet(); + throw new RuntimeException(); + }); + try { + sleep(200); // Give the processor few chances to run + } catch (InterruptedException e) { + logger.error("Error while trying to sleep", e); + } + assertEquals(2, notified.get()); + assertEquals(2, received.get()); + } + + public void testPreserveThreadContext() throws InterruptedException { + final int threadCount = randomIntBetween(2, 10); + final String testHeader = "test-header"; + + AtomicInteger received = new AtomicInteger(0); + AtomicInteger notified = new AtomicInteger(0); + + CountDownLatch processed = new CountDownLatch(threadCount); + AsyncIOProcessor processor = new BufferedAsyncIOProcessor<>( + logger, + scaledRandomIntBetween(1, 2024), + threadContext, + threadpool, + TimeValue.timeValueMillis(100) + ) { + @Override + protected void write(List>> candidates) throws IOException { + received.addAndGet(candidates.size()); + } + + @Override + protected String getBufferRefreshThreadPoolName() { + return ThreadPool.Names.TRANSLOG_SYNC; + } + }; + + // all threads should be non-blocking. + List threads = IntStream.range(0, threadCount).mapToObj(i -> new Thread(getTestName() + "_" + i) { + private final String response = randomAlphaOfLength(10); + + { + setDaemon(true); + } + + @Override + public void run() { + threadContext.addResponseHeader(testHeader, response); + processor.put(new Object(), (e) -> { + final Map> expected = Collections.singletonMap(testHeader, Collections.singletonList(response)); + assertEquals(expected, threadContext.getResponseHeaders()); + notified.incrementAndGet(); + processed.countDown(); + }); + } + }).collect(Collectors.toList()); + threads.forEach(Thread::start); + threads.forEach(t -> { + try { + t.join(20000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + assertTrue(processed.await(1, TimeUnit.SECONDS)); + + assertEquals(threadCount, notified.get()); + assertEquals(threadCount, received.get()); + threads.forEach(t -> assertFalse(t.isAlive())); + } + + public void testSlowConsumer() { + AtomicInteger received = new AtomicInteger(0); + AtomicInteger notified = new AtomicInteger(0); + + AsyncIOProcessor processor = new BufferedAsyncIOProcessor<>( + logger, + scaledRandomIntBetween(1, 2024), + threadContext, + threadpool, + TimeValue.timeValueMillis(100) + ) { + @Override + protected void write(List>> candidates) throws IOException { + received.addAndGet(candidates.size()); + } + + @Override + protected String getBufferRefreshThreadPoolName() { + return ThreadPool.Names.TRANSLOG_SYNC; + } + }; + + int threadCount = randomIntBetween(2, 10); + Semaphore serializePutSemaphore = new Semaphore(1); + CountDownLatch allDone = new CountDownLatch(threadCount); + List threads = IntStream.range(0, threadCount).mapToObj(i -> new Thread(getTestName() + "_" + i) { + { + setDaemon(true); + } + + @Override + public void run() { + try { + assertTrue(serializePutSemaphore.tryAcquire(10, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + processor.put(new Object(), (e) -> { + serializePutSemaphore.release(); + notified.incrementAndGet(); + allDone.countDown(); + }); + } + }).collect(Collectors.toList()); + threads.forEach(Thread::start); + threads.forEach(t -> { + try { + t.join(20000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + try { + assertTrue(allDone.await(20000, TimeUnit.MILLISECONDS)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + assertEquals(threadCount, notified.get()); + assertEquals(threadCount, received.get()); + threads.forEach(t -> assertFalse(t.isAlive())); + } + + public void testConsecutiveWritesAtLeastBufferIntervalAway() throws InterruptedException { + AtomicInteger received = new AtomicInteger(0); + AtomicInteger notified = new AtomicInteger(0); + long bufferIntervalMs = randomLongBetween(50, 150); + List writeInvocationTimes = new LinkedList<>(); + + AsyncIOProcessor processor = new BufferedAsyncIOProcessor<>( + logger, + scaledRandomIntBetween(1, 2024), + threadContext, + threadpool, + TimeValue.timeValueMillis(bufferIntervalMs) + ) { + @Override + protected void write(List>> candidates) throws IOException { + received.addAndGet(candidates.size()); + writeInvocationTimes.add(System.nanoTime()); + } + + @Override + protected String getBufferRefreshThreadPoolName() { + return ThreadPool.Names.TRANSLOG_SYNC; + } + }; + + int runCount = randomIntBetween(3, 10); + CountDownLatch processed = new CountDownLatch(runCount); + IntStream.range(0, runCount).forEach(i -> { + processor.put(new Object(), (e) -> { + notified.incrementAndGet(); + processed.countDown(); + }); + }); + assertTrue(processed.await(bufferIntervalMs * (runCount + 1), TimeUnit.MILLISECONDS)); + assertEquals(runCount, notified.get()); + assertEquals(runCount, received.get()); + for (int i = 1; i < writeInvocationTimes.size(); i++) { + assertTrue(writeInvocationTimes.get(i) - writeInvocationTimes.get(i - 1) >= bufferIntervalMs * 1_000_000); + } + } +} diff --git a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java index 1b5ef7369419f..b266335f3d572 100644 --- a/server/src/test/java/org/opensearch/index/IndexSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/IndexSettingsTests.java @@ -948,6 +948,47 @@ public void testSetRemoteRepositoryFailsWhenEmptyString() { assertEquals("Setting index.remote_store.repository should be provided with non-empty repository ID", iae.getMessage()); } + public void testSetRemoteTranslogBufferIntervalDefaultSetting() { + Version createdVersion = VersionUtils.randomVersionBetween(random(), Version.V_2_0_0, Version.CURRENT); + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), createdVersion) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true) + .build(); + assertEquals(TimeValue.timeValueMillis(100), IndexMetadata.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings)); + } + + public void testSetRemoteTranslogBufferIntervalFailsWhenRemoteTranslogIsNotEnabled() { + Settings indexSettings = Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL, "200ms") + .build(); + IllegalArgumentException iae = expectThrows( + IllegalArgumentException.class, + () -> IndexMetadata.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(indexSettings) + ); + assertEquals( + "Setting index.remote_store.translog.buffer_interval can only be set when index.remote_store.translog.enabled is set to true", + iae.getMessage() + ); + } + + public void testSetRemoteTranslogBufferIntervalFailsWhenEmpty() { + Settings indexSettings = Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL, "") + .build(); + IllegalArgumentException iae = expectThrows( + IllegalArgumentException.class, + () -> IndexMetadata.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(indexSettings) + ); + assertEquals( + "failed to parse setting [index.remote_store.translog.buffer_interval] with value [] as a time value: unit is missing or unrecognized", + iae.getMessage() + ); + } + @SuppressForbidden(reason = "sets the SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY feature flag") public void testExtendedCompatibilityVersionForRemoteSnapshot() throws Exception { try (FeatureFlagSetter f = FeatureFlagSetter.set(FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY)) { diff --git a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java index 12c3305e5f051..ecbbaf79a3687 100644 --- a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java +++ b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java @@ -36,6 +36,7 @@ public class ReplicaRecoveryWithRemoteTranslogOnPrimaryTests extends OpenSearchI .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, "true") .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "translog-repo") + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL, "100ms") .build(); public void testStartSequenceForReplicaRecovery() throws Exception { diff --git a/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java index 465629406b54b..33c748f46bd86 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java @@ -23,6 +23,7 @@ public class RemoteStorePeerRecoverySourceHandlerTests extends OpenSearchIndexLe .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, "true") .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "translog-repo") + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_BUFFER_INTERVAL, "100ms") .build(); public void testReplicaShardRecoveryUptoLastFlushedCommit() throws Exception { diff --git a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java index cd941feb37002..eca5a8eb19e47 100644 --- a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java +++ b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java @@ -133,6 +133,7 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso sizes.put(ThreadPool.Names.FETCH_SHARD_STARTED, ThreadPool::twiceAllocatedProcessors); sizes.put(ThreadPool.Names.FETCH_SHARD_STORE, ThreadPool::twiceAllocatedProcessors); sizes.put(ThreadPool.Names.TRANSLOG_TRANSFER, ThreadPool::halfAllocatedProcessorsMaxTen); + sizes.put(ThreadPool.Names.TRANSLOG_SYNC, n -> 4 * n); return sizes.get(threadPoolName).apply(numberOfProcessors); }