Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make recovery source send operations non-blocking #37503

Merged
merged 9 commits into from
Jan 17, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.Tuple;
Expand Down Expand Up @@ -71,7 +71,7 @@
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
Expand Down Expand Up @@ -514,97 +514,121 @@ TimeValue prepareTargetForTranslog(final boolean fileBasedRecovery, final int to
*/
void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp,
long maxSeqNoOfUpdatesOrDeletes, ActionListener<SendSnapshotResult> listener) throws IOException {
ActionListener.completeWith(listener, () -> sendSnapshotBlockingly(
startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes));
}

private SendSnapshotResult sendSnapshotBlockingly(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo,
Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp,
long maxSeqNoOfUpdatesOrDeletes) throws IOException {
assert requiredSeqNoRangeStart <= endingSeqNo + 1:
"requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo;
assert startingSeqNo <= requiredSeqNoRangeStart :
"startingSeqNo " + startingSeqNo + " is larger than requiredSeqNoRangeStart " + requiredSeqNoRangeStart;
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}

final StopWatch stopWatch = new StopWatch().start();

logger.trace("recovery [phase2]: sending transaction log operations (seq# from [" + startingSeqNo + "], " +
"required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]");

int ops = 0;
long size = 0;
int skippedOps = 0;
int totalSentOps = 0;
final AtomicLong targetLocalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
final List<Translog.Operation> operations = new ArrayList<>();
final AtomicInteger skippedOps = new AtomicInteger();
final AtomicInteger totalSentOps = new AtomicInteger();
final LocalCheckpointTracker requiredOpsTracker = new LocalCheckpointTracker(endingSeqNo, requiredSeqNoRangeStart - 1);

final int expectedTotalOps = snapshot.totalOperations();
if (expectedTotalOps == 0) {
logger.trace("no translog operations to send");
}

final CancellableThreads.IOInterruptible sendBatch = () -> {
// TODO: Make this non-blocking
final PlainActionFuture<Long> future = new PlainActionFuture<>();
recoveryTarget.indexTranslogOperations(
operations, expectedTotalOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, future);
targetLocalCheckpoint.set(future.actionGet());
// Wrap translog snapshot to make it synchronized as it is accessed by different threads through sendBatch.
// Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible.
final Translog.Snapshot wrappedSnapshot = synchronizedSnapshot(snapshot);
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
final CheckedSupplier<List<Translog.Operation>, IOException> readNextBatch = () -> {
final List<Translog.Operation> operations = new ArrayList<>();
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
long batchSizeInBytes = 0L;
Translog.Operation operation;
while ((operation = wrappedSnapshot.next()) != null) {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
cancellableThreads.checkForCancel();
final long seqNo = operation.seqNo();
if (seqNo < startingSeqNo || seqNo > endingSeqNo) {
skippedOps.incrementAndGet();
continue;
}
operations.add(operation);
batchSizeInBytes += operation.estimateSize();
totalSentOps.incrementAndGet();
requiredOpsTracker.markSeqNoAsCompleted(seqNo);

// check if this request is past bytes threshold, and if so, send it off
if (batchSizeInBytes >= chunkSizeInBytes) {
break;
}
}
return operations;
};

// send operations in batches
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
cancellableThreads.checkForCancel();
final StopWatch stopWatch = new StopWatch().start();
final ActionListener<Long> batchedListener = ActionListener.wrap(
targetLocalCheckpoint -> {
assert wrappedSnapshot.totalOperations() == wrappedSnapshot.skippedOperations() + skippedOps.get() + totalSentOps.get()
: String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]",
wrappedSnapshot.totalOperations(), wrappedSnapshot.skippedOperations(), skippedOps.get(), totalSentOps.get());
if (requiredOpsTracker.getCheckpoint() < endingSeqNo) {
throw new IllegalStateException("translog replay failed to cover required sequence numbers" +
" (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is ["
+ (requiredOpsTracker.getCheckpoint() + 1) + "]");
}
stopWatch.stop();
final TimeValue tookTime = stopWatch.totalTime();
logger.trace("recovery [phase2]: took [{}]", tookTime);
listener.onResponse(new SendSnapshotResult(targetLocalCheckpoint, totalSentOps.get(), tookTime));
},
listener::onFailure
);

sendBatch(readNextBatch, true, SequenceNumbers.UNASSIGNED_SEQ_NO, wrappedSnapshot.totalOperations(),
maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, batchedListener);
}

final long seqNo = operation.seqNo();
if (seqNo < startingSeqNo || seqNo > endingSeqNo) {
skippedOps++;
continue;
}
operations.add(operation);
ops++;
size += operation.estimateSize();
totalSentOps++;
requiredOpsTracker.markSeqNoAsCompleted(seqNo);

// check if this request is past bytes threshold, and if so, send it off
if (size >= chunkSizeInBytes) {
cancellableThreads.executeIO(sendBatch);
logger.trace("sent batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps);
ops = 0;
size = 0;
operations.clear();
}
private void sendBatch(CheckedSupplier<List<Translog.Operation>, IOException> nextBatch, boolean firstBatch,
long targetLocalCheckpoint, int totalTranslogOps, long maxSeenAutoIdTimestamp,
long maxSeqNoOfUpdatesOrDeletes, ActionListener<Long> listener) throws IOException {
final List<Translog.Operation> operations = nextBatch.get();
// send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint
if (operations.isEmpty() == false || firstBatch) {
cancellableThreads.execute(() -> {
recoveryTarget.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes,
ActionListener.wrap(
newCheckpoint -> {
sendBatch(nextBatch, false, SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint),
totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, listener);
},
listener::onFailure
));
});
} else {
listener.onResponse(targetLocalCheckpoint);
}
}

if (!operations.isEmpty() || totalSentOps == 0) {
// send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint
cancellableThreads.executeIO(sendBatch);
}
private Translog.Snapshot synchronizedSnapshot(Translog.Snapshot snapshot) {
return new Translog.Snapshot() {
@Override
public synchronized Translog.Operation next() throws IOException {
return snapshot.next();
}

assert expectedTotalOps == snapshot.skippedOperations() + skippedOps + totalSentOps
: String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]",
expectedTotalOps, snapshot.skippedOperations(), skippedOps, totalSentOps);
@Override
public synchronized void close() throws IOException {
snapshot.close();
}

if (requiredOpsTracker.getCheckpoint() < endingSeqNo) {
throw new IllegalStateException("translog replay failed to cover required sequence numbers" +
" (required range [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "). first missing op is ["
+ (requiredOpsTracker.getCheckpoint() + 1) + "]");
}
@Override
public synchronized int totalOperations() {
return snapshot.totalOperations();
}

logger.trace("sent final batch of [{}][{}] (total: [{}]) translog operations", ops, new ByteSizeValue(size), expectedTotalOps);
@Override
public synchronized int skippedOperations() {
return snapshot.skippedOperations();
}

stopWatch.stop();
final TimeValue tookTime = stopWatch.totalTime();
logger.trace("recovery [phase2]: took [{}]", tookTime);
return new SendSnapshotResult(targetLocalCheckpoint.get(), totalSentOps, tookTime);
@Override
public synchronized int overriddenOperations() {
return snapshot.overriddenOperations();
}
};
}

void finalizeRecovery(final long targetLocalCheckpoint, final ActionListener<Void> listener) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.io.OutputStream;
Expand Down Expand Up @@ -115,6 +119,18 @@ public class RecoverySourceHandlerTests extends ESTestCase {
private final ShardId shardId = new ShardId(INDEX_SETTINGS.getIndex(), 1);
private final ClusterSettings service = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);

private ThreadPool threadPool;

@Before
public void setUpThreadPool() {
threadPool = new TestThreadPool(getTestName());
}

@After
public void tearDownThreadPool() {
terminate(threadPool);
}

public void testSendFiles() throws Throwable {
Settings settings = Settings.builder().put("indices.recovery.concurrent_streams", 1).
put("indices.recovery.concurrent_small_file_streams", 1).build();
Expand Down Expand Up @@ -198,18 +214,17 @@ public StartRecoveryRequest getStartRecoveryRequest() throws IOException {
}

public void testSendSnapshotSendsOps() throws IOException {
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
final int fileChunkSizeInBytes = recoverySettings.getChunkSize().bytesAsInt();
final int fileChunkSizeInBytes = between(1, 4096);
final StartRecoveryRequest request = getStartRecoveryRequest();
final IndexShard shard = mock(IndexShard.class);
when(shard.state()).thenReturn(IndexShardState.STARTED);
final List<Translog.Operation> operations = new ArrayList<>();
final int initialNumberOfDocs = randomIntBetween(16, 64);
final int initialNumberOfDocs = randomIntBetween(10, 1000);
for (int i = 0; i < initialNumberOfDocs; i++) {
final Engine.Index index = getIndex(Integer.toString(i));
operations.add(new Translog.Index(index, new Engine.IndexResult(1, 1, SequenceNumbers.UNASSIGNED_SEQ_NO, true)));
}
final int numberOfDocsWithValidSequenceNumbers = randomIntBetween(16, 64);
final int numberOfDocsWithValidSequenceNumbers = randomIntBetween(10, 1000);
for (int i = initialNumberOfDocs; i < initialNumberOfDocs + numberOfDocsWithValidSequenceNumbers; i++) {
final Engine.Index index = getIndex(Integer.toString(i));
operations.add(new Translog.Index(index, new Engine.IndexResult(1, 1, i - initialNumberOfDocs, true)));
Expand All @@ -219,12 +234,14 @@ public void testSendSnapshotSendsOps() throws IOException {
final long endingSeqNo = randomIntBetween((int) requiredStartingSeqNo - 1, numberOfDocsWithValidSequenceNumbers - 1);

final List<Translog.Operation> shippedOps = new ArrayList<>();
final AtomicLong checkpointOnTarget = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
RecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() {
@Override
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long timestamp, long msu,
ActionListener<Long> listener) {
shippedOps.addAll(operations);
listener.onResponse(SequenceNumbers.NO_OPS_PERFORMED);
checkpointOnTarget.set(randomLongBetween(checkpointOnTarget.get(), Long.MAX_VALUE));
maybeExecuteAsync(() -> listener.onResponse(checkpointOnTarget.get()));
}
};
RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10));
Expand All @@ -239,6 +256,7 @@ public void indexTranslogOperations(List<Translog.Operation> operations, int tot
for (int i = 0; i < shippedOps.size(); i++) {
assertThat(shippedOps.get(i), equalTo(operations.get(i + (int) startingSeqNo + initialNumberOfDocs)));
}
assertThat(result.targetLocalCheckpoint, equalTo(checkpointOnTarget.get()));
if (endingSeqNo >= requiredStartingSeqNo + 1) {
// check that missing ops blows up
List<Translog.Operation> requiredOps = operations.subList(0, operations.size() - 1).stream() // remove last null marker
Expand All @@ -253,6 +271,40 @@ public void indexTranslogOperations(List<Translog.Operation> operations, int tot
}
}

public void testSendSnapshotStopOnError() throws Exception {
final int fileChunkSizeInBytes = between(1, 10 * 1024);
final StartRecoveryRequest request = getStartRecoveryRequest();
final IndexShard shard = mock(IndexShard.class);
when(shard.state()).thenReturn(IndexShardState.STARTED);
final List<Translog.Operation> ops = new ArrayList<>();
for (int numOps = between(1, 256), i = 0; i < numOps; i++) {
final Engine.Index index = getIndex(Integer.toString(i));
ops.add(new Translog.Index(index, new Engine.IndexResult(1, 1, i, true)));
}
final AtomicBoolean wasFailed = new AtomicBoolean();
RecoveryTargetHandler recoveryTarget = new TestRecoveryTargetHandler() {
@Override
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long timestamp,
long msu, ActionListener<Long> listener) {
if (randomBoolean()) {
maybeExecuteAsync(() -> listener.onResponse(SequenceNumbers.NO_OPS_PERFORMED));
} else {
maybeExecuteAsync(() -> listener.onFailure(new RuntimeException("test - failed to index")));
wasFailed.set(true);
}
}
};
RecoverySourceHandler handler = new RecoverySourceHandler(shard, recoveryTarget, request, fileChunkSizeInBytes, between(1, 10));
PlainActionFuture<RecoverySourceHandler.SendSnapshotResult> future = new PlainActionFuture<>();
final long startingSeqNo = randomLongBetween(0, ops.size() - 1L);
final long endingSeqNo = randomLongBetween(startingSeqNo, ops.size() - 1L);
handler.phase2(startingSeqNo, startingSeqNo, endingSeqNo, newTranslogSnapshot(ops, Collections.emptyList()),
randomNonNegativeLong(), randomNonNegativeLong(), future);
if (wasFailed.get()) {
assertThat(expectThrows(RuntimeException.class, () -> future.actionGet()).getMessage(), equalTo("test - failed to index"));
}
}

private Engine.Index getIndex(final String id) {
final String type = "test";
final ParseContext.Document document = new ParseContext.Document();
Expand Down Expand Up @@ -717,4 +769,12 @@ public void close() {
}
};
}

private void maybeExecuteAsync(Runnable runnable) {
if (randomBoolean()) {
threadPool.generic().execute(runnable);
} else {
runnable.run();
}
}
}