Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use MultiFileTransfer in CCR remote recovery #44514

Merged
merged 10 commits into from
Oct 22, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
* one of the networking threads which receive/handle the responses of the current pending file chunk requests. This process will continue
* until all chunk requests are sent/responded.
*/
abstract class MultiFileTransfer<Request extends MultiFileTransfer.ChunkRequest> implements Closeable {
public abstract class MultiFileTransfer<Request extends MultiFileTransfer.ChunkRequest> implements Closeable {
private Status status = Status.PROCESSING;
private final Logger logger;
private final ActionListener<Void> listener;
Expand Down Expand Up @@ -121,7 +121,7 @@ private void handleItems(List<Tuple<FileChunkResponseItem, Consumer<Exception>>>
return;
}
final long requestSeqId = requestSeqIdTracker.generateSeqNo();
sendChunkRequest(request.v2(), ActionListener.wrap(
executeChunkRequest(request.v2(), ActionListener.wrap(
r -> addItem(requestSeqId, request.v1(), null),
e -> addItem(requestSeqId, request.v1(), e)));
}
Expand Down Expand Up @@ -179,7 +179,7 @@ private Tuple<StoreFileMetaData, Request> getNextRequest() throws Exception {

protected abstract Request nextChunkRequest(StoreFileMetaData md) throws IOException;

protected abstract void sendChunkRequest(Request request, ActionListener<Void> listener);
protected abstract void executeChunkRequest(Request request, ActionListener<Void> listener);

protected abstract void handleError(StoreFileMetaData md, Exception e) throws Exception;

Expand All @@ -195,7 +195,7 @@ private static class FileChunkResponseItem {
}
}

protected interface ChunkRequest {
public interface ChunkRequest {
/**
* @return {@code true} if this chunk request is the last chunk of the current file
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.transport.Transports;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -39,10 +41,12 @@
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;

public class MultiFileWriter implements Releasable {
public class MultiFileWriter extends AbstractRefCounted implements Releasable {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved

public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempFilePrefix, Logger logger, Runnable ensureOpen) {
super("multi_file_writer");
this.store = store;
this.indexState = indexState;
this.tempFilePrefix = tempFilePrefix;
Expand All @@ -51,6 +55,7 @@ public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempF
}

private final Runnable ensureOpen;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Logger logger;
private final Store store;
private final RecoveryState.Index indexState;
Expand All @@ -64,6 +69,7 @@ public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempF

public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, boolean lastChunk)
throws IOException {
assert Transports.assertNotTransportThread("multi_file_writer");
final FileChunkWriter writer = fileChunkWriters.computeIfAbsent(fileMetaData.name(), name -> new FileChunkWriter());
writer.writeChunk(new FileChunk(fileMetaData, content, position, lastChunk));
}
Expand Down Expand Up @@ -138,6 +144,13 @@ private void innerWriteFileChunk(StoreFileMetaData fileMetaData, long position,

@Override
public void close() {
if (closed.compareAndSet(false, true)) {
decRef();
}
}

@Override
protected void closeInternal() {
fileChunkWriters.clear();
// clean open index outputs
Iterator<Map.Entry<String, IndexOutput>> iterator = openIndexOutputs.entrySet().iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ protected FileChunk nextChunkRequest(StoreFileMetaData md) throws IOException {
}

@Override
protected void sendChunkRequest(FileChunk request, ActionListener<Void> listener) {
protected void executeChunkRequest(FileChunk request, ActionListener<Void> listener) {
cancellableThreads.execute(() -> recoveryTarget.writeFileChunk(
request.md, request.position, request.content, request.lastChunk, translogOps.getAsInt(), listener));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
Expand All @@ -22,6 +21,7 @@
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.support.ListenerTimeouts;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand All @@ -32,18 +32,16 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException;
import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException;
import org.elasticsearch.index.shard.IndexShardRecoveryException;
Expand All @@ -54,6 +52,7 @@
import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.recovery.MultiFileTransfer;
import org.elasticsearch.indices.recovery.MultiFileWriter;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.IndexId;
Expand Down Expand Up @@ -88,12 +87,11 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.retentionLeaseId;
import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.syncAddRetentionLease;
import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.syncRenewRetentionLease;
Expand Down Expand Up @@ -477,97 +475,82 @@ void restoreFiles(Store store) throws IOException {
}

@Override
protected void restoreFiles(List<FileInfo> filesToRecover, Store store) throws IOException {
protected void restoreFiles(List<FileInfo> filesToRecover, Store store) {
logger.trace("[{}] starting CCR restore of {} files", shardId, filesToRecover);
final PlainActionFuture<Void> restoreFilesFuture = new PlainActionFuture<>();
final List<StoreFileMetaData> mds = filesToRecover.stream().map(FileInfo::metadata).collect(Collectors.toList());
final MultiFileTransfer<FileChunk> multiFileTransfer = new MultiFileTransfer<>(
logger, threadPool.getThreadContext(), restoreFilesFuture, ccrSettings.getMaxConcurrentFileChunks(), mds) {

try (MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> {
})) {
final LocalCheckpointTracker requestSeqIdTracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
final AtomicReference<Tuple<StoreFileMetaData, Exception>> error = new AtomicReference<>();
final MultiFileWriter multiFileWriter = new MultiFileWriter(store, recoveryState.getIndex(), "", logger, () -> {});
long offset = 0;

for (FileInfo fileInfo : filesToRecover) {
final long fileLength = fileInfo.length();
long offset = 0;
while (offset < fileLength && error.get() == null) {
final long requestSeqId = requestSeqIdTracker.generateSeqNo();
try {
requestSeqIdTracker.waitForProcessedOpsToComplete(requestSeqId - ccrSettings.getMaxConcurrentFileChunks());

if (error.get() != null) {
requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId);
break;
}

final int bytesRequested = Math.toIntExact(
Math.min(ccrSettings.getChunkSize().getBytes(), fileLength - offset));
offset += bytesRequested;

final GetCcrRestoreFileChunkRequest request =
new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileInfo.name(), bytesRequested);
logger.trace("[{}] [{}] fetching chunk for file [{}], expected offset: {}, size: {}", shardId, snapshotId,
fileInfo.name(), offset, bytesRequested);

TimeValue timeout = ccrSettings.getRecoveryActionTimeout();
ActionListener<GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse> listener =
ListenerTimeouts.wrapWithTimeout(threadPool, ActionListener.wrap(
r -> threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e));
requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId);
}

@Override
protected void doRun() throws Exception {
final int actualChunkSize = r.getChunk().length();
logger.trace("[{}] [{}] got response for file [{}], offset: {}, length: {}", shardId,
snapshotId, fileInfo.name(), r.getOffset(), actualChunkSize);
final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize);
throttleListener.accept(nanosPaused);
final boolean lastChunk = r.getOffset() + actualChunkSize >= fileLength;
multiFileWriter.writeFileChunk(fileInfo.metadata(), r.getOffset(), r.getChunk(), lastChunk);
requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId);
}
}),
e -> {
error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e));
requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId);
}
), timeout, ThreadPool.Names.GENERIC, GetCcrRestoreFileChunkAction.NAME);
remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE, request, listener);
} catch (Exception e) {
error.compareAndSet(null, Tuple.tuple(fileInfo.metadata(), e));
requestSeqIdTracker.markSeqNoAsProcessed(requestSeqId);
}
}
@Override
protected void onNewFile(StoreFileMetaData md) {
offset = 0;
}

try {
requestSeqIdTracker.waitForProcessedOpsToComplete(requestSeqIdTracker.getMaxSeqNo());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ElasticsearchException(e);
@Override
protected FileChunk nextChunkRequest(StoreFileMetaData md) {
final int bytesRequested = Math.toIntExact(Math.min(ccrSettings.getChunkSize().getBytes(), md.length() - offset));
offset += bytesRequested;
return new FileChunk(md, bytesRequested, offset == md.length());
}
if (error.get() != null) {
handleError(store, error.get().v2());

@Override
protected void executeChunkRequest(FileChunk request, ActionListener<Void> listener) {
final ActionListener<GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse> threadedListener
= new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.GENERIC, ActionListener.wrap(
r -> {
writeFileChunk(request.md, r);
listener.onResponse(null);
}, listener::onFailure), false);

remoteClient.execute(GetCcrRestoreFileChunkAction.INSTANCE,
new GetCcrRestoreFileChunkRequest(node, sessionUUID, request.md.name(), request.bytesRequested),
ListenerTimeouts.wrapWithTimeout(threadPool, threadedListener, ccrSettings.getRecoveryActionTimeout(),
ThreadPool.Names.GENERIC, GetCcrRestoreFileChunkAction.NAME));
}
}

logger.trace("[{}] completed CCR restore", shardId);
}
private void writeFileChunk(StoreFileMetaData md,
GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse r) throws Exception {
final int actualChunkSize = r.getChunk().length();
logger.trace("[{}] [{}] got response for file [{}], offset: {}, length: {}",
shardId, snapshotId, md.name(), r.getOffset(), actualChunkSize);
final long nanosPaused = ccrSettings.getRateLimiter().maybePause(actualChunkSize);
throttleListener.accept(nanosPaused);
multiFileWriter.incRef();
try (Releasable ignored = multiFileWriter::decRef) {
final boolean lastChunk = r.getOffset() + actualChunkSize >= md.length();
multiFileWriter.writeFileChunk(md, r.getOffset(), r.getChunk(), lastChunk);
} catch (Exception e) {
handleError(md, e);
throw e;
}
}

@Override
protected void handleError(StoreFileMetaData md, Exception e) throws Exception {
final IOException corruptIndexException;
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
try {
store.markStoreCorrupted(corruptIndexException);
} catch (IOException ioe) {
logger.warn("store cannot be marked as corrupted", e);
}
throw corruptIndexException;
}
throw e;
}

private void handleError(Store store, Exception e) throws IOException {
final IOException corruptIndexException;
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
try {
store.markStoreCorrupted(corruptIndexException);
} catch (IOException ioe) {
logger.warn("store cannot be marked as corrupted", e);
@Override
public void close() {
multiFileWriter.close();
}
throw corruptIndexException;
} else {
ExceptionsHelper.reThrowIfNotNull(e);
}
};
multiFileTransfer.start();
restoreFilesFuture.actionGet();
logger.trace("[{}] completed CCR restore", shardId);
}

@Override
Expand All @@ -581,5 +564,22 @@ public void close() {
ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse response =
remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet(ccrSettings.getRecoveryActionTimeout());
}

private static class FileChunk implements MultiFileTransfer.ChunkRequest {
final StoreFileMetaData md;
final int bytesRequested;
final boolean lastChunk;

FileChunk(StoreFileMetaData md, int bytesRequested, boolean lastChunk) {
this.md = md;
this.bytesRequested = bytesRequested;
this.lastChunk = lastChunk;
}

@Override
public boolean lastChunk() {
return lastChunk;
}
}
}
}