Skip to content

Commit

Permalink
Make PrimaryReplicaResyncer Fork to Generic Pool (#69949)
Browse files Browse the repository at this point in the history
Reading ops from the translog snapshot must not run on the transport thread.
When sending more than one batch of ops the listener (and thus `run`) would be
invoked on the transport thread for all but the first batch of ops.
=> Forking to the generic pool like we do for sending ops during recovery.
  • Loading branch information
original-brownbear authored Mar 4, 2021
1 parent 36f3ea9 commit e622b2c
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.transport.Transports;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -145,6 +146,7 @@ public Translog.Operation next() throws IOException {
private boolean assertAccessingThread() {
assert singleConsumer == false || creationThread == Thread.currentThread() :
"created by [" + creationThread + "] != current thread [" + Thread.currentThread() + "]";
assert Transports.assertNotTransportThread("reading changes snapshot may involve slow IO");
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -45,7 +46,7 @@ public class PrimaryReplicaSyncer {

private static final Logger logger = LogManager.getLogger(PrimaryReplicaSyncer.class);

private final TaskManager taskManager;
private final TransportService transportService;
private final SyncAction syncAction;

public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB);
Expand All @@ -54,12 +55,12 @@ public class PrimaryReplicaSyncer {

@Inject
public PrimaryReplicaSyncer(TransportService transportService, TransportResyncReplicationAction syncAction) {
this(transportService.getTaskManager(), syncAction);
this(transportService, (SyncAction) syncAction);
}

// for tests
public PrimaryReplicaSyncer(TaskManager taskManager, SyncAction syncAction) {
this.taskManager = taskManager;
public PrimaryReplicaSyncer(TransportService transportService, SyncAction syncAction) {
this.transportService = transportService;
this.syncAction = syncAction;
}

Expand Down Expand Up @@ -145,6 +146,7 @@ public void onFailure(final Exception e) {
private void resync(final ShardId shardId, final String primaryAllocationId, final long primaryTerm, final Translog.Snapshot snapshot,
long startingSeqNo, long maxSeqNo, long maxSeenAutoIdTimestamp, ActionListener<ResyncTask> listener) {
ResyncRequest request = new ResyncRequest(shardId, primaryAllocationId);
final TaskManager taskManager = transportService.getTaskManager();
ResyncTask resyncTask = (ResyncTask) taskManager.register("transport", "resync", request); // it's not transport :-)
ActionListener<Void> wrappedListener = new ActionListener<Void>() {
@Override
Expand All @@ -163,7 +165,7 @@ public void onFailure(Exception e) {
};
try {
new SnapshotSender(syncAction, resyncTask, shardId, primaryAllocationId, primaryTerm, snapshot, chunkSize.bytesAsInt(),
startingSeqNo, maxSeqNo, maxSeenAutoIdTimestamp, wrappedListener).run();
startingSeqNo, maxSeqNo, maxSeenAutoIdTimestamp, transportService.getThreadPool().generic(), wrappedListener).run();
} catch (Exception e) {
wrappedListener.onFailure(e);
}
Expand All @@ -182,6 +184,7 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener<R
private final long primaryTerm;
private final ShardId shardId;
private final Translog.Snapshot snapshot;
private final Executor executor; // executor to fork to for reading and then sending ops from the snapshot
private final long startingSeqNo;
private final long maxSeqNo;
private final long maxSeenAutoIdTimestamp;
Expand All @@ -194,7 +197,7 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener<R

SnapshotSender(SyncAction syncAction, ResyncTask task, ShardId shardId, String primaryAllocationId, long primaryTerm,
Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, long maxSeqNo,
long maxSeenAutoIdTimestamp, ActionListener<Void> listener) {
long maxSeenAutoIdTimestamp, Executor executor, ActionListener<Void> listener) {
this.logger = PrimaryReplicaSyncer.logger;
this.syncAction = syncAction;
this.task = task;
Expand All @@ -206,13 +209,14 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener<R
this.startingSeqNo = startingSeqNo;
this.maxSeqNo = maxSeqNo;
this.maxSeenAutoIdTimestamp = maxSeenAutoIdTimestamp;
this.executor = executor;
this.listener = listener;
task.setTotalOperations(snapshot.totalOperations());
}

@Override
public void onResponse(ResyncReplicationResponse response) {
run();
executor.execute(this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.transport.MockTransport;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -42,6 +43,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand All @@ -51,7 +53,8 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {

public void testSyncerSendsOffCorrectDocuments() throws Exception {
IndexShard shard = newStartedShard(true);
TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
TransportService transportService = new MockTransport().createTransportService(Settings.EMPTY, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, address -> null, null, emptySet());
AtomicBoolean syncActionCalled = new AtomicBoolean();
List<ResyncReplicationRequest> resyncRequests = new ArrayList<>();
PrimaryReplicaSyncer.SyncAction syncAction =
Expand All @@ -62,7 +65,7 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception {
assertThat(parentTask, instanceOf(PrimaryReplicaSyncer.ResyncTask.class));
listener.onResponse(new ResyncReplicationResponse());
};
PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(taskManager, syncAction);
PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(transportService, syncAction);
syncer.setChunkSize(new ByteSizeValue(randomIntBetween(1, 10)));

int numDocs = randomInt(10);
Expand Down Expand Up @@ -131,8 +134,9 @@ public void testSyncerOnClosingShard() throws Exception {
syncActionCalled.set(true);
threadPool.generic().execute(() -> listener.onResponse(new ResyncReplicationResponse()));
};
PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(
new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()), syncAction);
TransportService transportService = new MockTransport().createTransportService(Settings.EMPTY, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> null, null, emptySet());
PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(transportService, syncAction);
syncer.setChunkSize(new ByteSizeValue(1)); // every document is sent off separately

int numDocs = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.transport.MockTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -159,7 +160,8 @@ protected class ReplicationGroup implements AutoCloseable, Iterable<IndexShard>
private volatile ReplicationTargets replicationTargets;

private final PrimaryReplicaSyncer primaryReplicaSyncer = new PrimaryReplicaSyncer(
new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()),
new MockTransport().createTransportService(Settings.EMPTY, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, address -> null, null, Collections.emptySet()),
(request, parentTask, primaryAllocationId, primaryTerm, listener) -> {
try {
new ResyncAction(request, listener, ReplicationGroup.this).execute();
Expand Down

0 comments on commit e622b2c

Please sign in to comment.