Skip to content

Commit

Permalink
Cherry-picking opensearch-project#3525.
Browse files Browse the repository at this point in the history
Signed-off-by: Rishikesh1159 <[email protected]>
  • Loading branch information
mch2 authored and Rishikesh1159 committed Aug 9, 2022
1 parent 313b919 commit ab2a145
Show file tree
Hide file tree
Showing 21 changed files with 1,812 additions and 430 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices;

import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.FutureUtils;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardRelocatedException;
import org.opensearch.threadpool.ThreadPool;

import java.util.concurrent.CompletableFuture;

/**
* Execute a Runnable after acquiring the primary's operation permit.
*
* @opensearch.internal
*/
public final class RunUnderPrimaryPermit {

public static void run(
CancellableThreads.Interruptible runnable,
String reason,
IndexShard primary,
CancellableThreads cancellableThreads,
Logger logger
) {
cancellableThreads.execute(() -> {
CompletableFuture<Releasable> permit = new CompletableFuture<>();
final ActionListener<Releasable> onAcquired = new ActionListener<>() {
@Override
public void onResponse(Releasable releasable) {
if (permit.complete(releasable) == false) {
releasable.close();
}
}

@Override
public void onFailure(Exception e) {
permit.completeExceptionally(e);
}
};
primary.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, reason);
try (Releasable ignored = FutureUtils.get(permit)) {
// check that the IndexShard still has the primary authority. This needs to be checked under operation permit to prevent
// races, as IndexShard will switch its authority only when it holds all operation permits, see IndexShard.relocated()
if (primary.isRelocatedPrimary()) {
throw new IndexShardRelocatedException(primary.shardId());
}
runnable.run();
} finally {
// just in case we got an exception (likely interrupted) while waiting for the get
permit.whenComplete((r, e) -> {
if (r != null) {
r.close();
}
if (e != null) {
logger.trace("suppressing exception on completion (it was already bubbled up or the operation was aborted)", e);
}
});
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.recovery;

import org.opensearch.action.ActionListener;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.index.store.StoreFileMetadata;

/**
* Writes a partial file chunk to the target store.
*
* @opensearch.internal
*/
@FunctionalInterface
public interface FileChunkWriter {

void writeFileChunk(
StoreFileMetadata fileMetadata,
long position,
BytesReference content,
boolean lastChunk,
int totalTranslogOps,
ActionListener<Void> listener
);
}
Loading

0 comments on commit ab2a145

Please sign in to comment.