Skip to content

Commit

Permalink
[Segment Replication] Add components for segment replication to perfo…
Browse files Browse the repository at this point in the history
…rm file copy. (#3525)

* Add components for segment replication to perform file copy.

This change adds the required components to SegmentReplicationSourceService to initiate copy and react to lifecycle events.
Along with new components it refactors common file copy code from RecoverySourceHandler into reusable pieces.

Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 authored Jun 20, 2022
1 parent 0b84016 commit 04e43a7
Show file tree
Hide file tree
Showing 21 changed files with 1,812 additions and 430 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices;

import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.FutureUtils;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardRelocatedException;
import org.opensearch.threadpool.ThreadPool;

import java.util.concurrent.CompletableFuture;

/**
* Execute a Runnable after acquiring the primary's operation permit.
*
* @opensearch.internal
*/
public final class RunUnderPrimaryPermit {

public static void run(
CancellableThreads.Interruptible runnable,
String reason,
IndexShard primary,
CancellableThreads cancellableThreads,
Logger logger
) {
cancellableThreads.execute(() -> {
CompletableFuture<Releasable> permit = new CompletableFuture<>();
final ActionListener<Releasable> onAcquired = new ActionListener<>() {
@Override
public void onResponse(Releasable releasable) {
if (permit.complete(releasable) == false) {
releasable.close();
}
}

@Override
public void onFailure(Exception e) {
permit.completeExceptionally(e);
}
};
primary.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, reason);
try (Releasable ignored = FutureUtils.get(permit)) {
// check that the IndexShard still has the primary authority. This needs to be checked under operation permit to prevent
// races, as IndexShard will switch its authority only when it holds all operation permits, see IndexShard.relocated()
if (primary.isRelocatedPrimary()) {
throw new IndexShardRelocatedException(primary.shardId());
}
runnable.run();
} finally {
// just in case we got an exception (likely interrupted) while waiting for the get
permit.whenComplete((r, e) -> {
if (r != null) {
r.close();
}
if (e != null) {
logger.trace("suppressing exception on completion (it was already bubbled up or the operation was aborted)", e);
}
});
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.recovery;

import org.opensearch.action.ActionListener;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.index.store.StoreFileMetadata;

/**
* Writes a partial file chunk to the target store.
*
* @opensearch.internal
*/
@FunctionalInterface
public interface FileChunkWriter {

void writeFileChunk(
StoreFileMetadata fileMetadata,
long position,
BytesReference content,
boolean lastChunk,
int totalTranslogOps,
ActionListener<Void> listener
);
}
Loading

0 comments on commit 04e43a7

Please sign in to comment.