-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Backport 2.x] Batch translog sync/upload per x ms for remote-backed …
…indexes (#5854) (#6066) * Batch translog sync/upload per x ms for remote-backed indexes (#5854) Signed-off-by: Ashish Singh <[email protected]> Co-authored-by: Ashwin Pankaj <[email protected]> Co-authored-by: Laxman Muttineni <[email protected]>
- Loading branch information
1 parent
13bfdd6
commit 79dbd2b
Showing
13 changed files
with
539 additions
and
24 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
93 changes: 93 additions & 0 deletions
93
server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.common.util.concurrent; | ||
|
||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.common.collect.Tuple; | ||
import org.opensearch.common.unit.TimeValue; | ||
import org.opensearch.threadpool.ThreadPool; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.function.Consumer; | ||
|
||
/** | ||
* A variant of {@link AsyncIOProcessor} that allows to batch and buffer processing items at every | ||
* {@link BufferedAsyncIOProcessor#bufferInterval} in a separate threadpool. | ||
* <p> | ||
* Requests are buffered till processor thread calls @{link drainAndProcessAndRelease} after bufferInterval. | ||
* If more requests are enqueued between invocations of drainAndProcessAndRelease, another processor thread | ||
* gets scheduled. Subsequent requests will get buffered till drainAndProcessAndRelease gets called in this new | ||
* processor thread. | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public abstract class BufferedAsyncIOProcessor<Item> extends AsyncIOProcessor<Item> { | ||
|
||
private final ThreadPool threadpool; | ||
private final TimeValue bufferInterval; | ||
|
||
protected BufferedAsyncIOProcessor( | ||
Logger logger, | ||
int queueSize, | ||
ThreadContext threadContext, | ||
ThreadPool threadpool, | ||
TimeValue bufferInterval | ||
) { | ||
super(logger, queueSize, threadContext); | ||
this.threadpool = threadpool; | ||
this.bufferInterval = bufferInterval; | ||
} | ||
|
||
@Override | ||
public void put(Item item, Consumer<Exception> listener) { | ||
Objects.requireNonNull(item, "item must not be null"); | ||
Objects.requireNonNull(listener, "listener must not be null"); | ||
addToQueue(item, listener); | ||
scheduleProcess(); | ||
} | ||
|
||
private void scheduleProcess() { | ||
if (getQueue().isEmpty() == false && getPromiseSemaphore().tryAcquire()) { | ||
try { | ||
threadpool.schedule(this::process, getBufferInterval(), getBufferRefreshThreadPoolName()); | ||
} catch (Exception e) { | ||
getLogger().error("failed to schedule process"); | ||
processSchedulingFailure(e); | ||
getPromiseSemaphore().release(); | ||
// This is to make sure that any new items that are added to the queue between processSchedulingFailure | ||
// and releasing the semaphore is handled by a subsequent refresh and not starved. | ||
scheduleProcess(); | ||
} | ||
} | ||
} | ||
|
||
private void processSchedulingFailure(Exception e) { | ||
List<Tuple<Item, Consumer<Exception>>> candidates = new ArrayList<>(); | ||
getQueue().drainTo(candidates); | ||
notifyList(candidates, e); | ||
} | ||
|
||
private void process() { | ||
drainAndProcessAndRelease(new ArrayList<>()); | ||
scheduleProcess(); | ||
} | ||
|
||
private TimeValue getBufferInterval() { | ||
long timeSinceLastRunStartInNS = System.nanoTime() - getLastRunStartTimeInNs(); | ||
if (timeSinceLastRunStartInNS >= bufferInterval.getNanos()) { | ||
return TimeValue.ZERO; | ||
} | ||
return TimeValue.timeValueNanos(bufferInterval.getNanos() - timeSinceLastRunStartInNS); | ||
} | ||
|
||
protected abstract String getBufferRefreshThreadPoolName(); | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.