Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make PrimaryReplicaResyncer Fork to Generic Pool #69949

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.transport.Transports;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -145,6 +146,7 @@ public Translog.Operation next() throws IOException {
private boolean assertAccessingThread() {
assert singleConsumer == false || creationThread == Thread.currentThread() :
"created by [" + creationThread + "] != current thread [" + Thread.currentThread() + "]";
assert Transports.assertNotTransportThread("reading changes snapshot may involve slow IO");
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -45,7 +46,7 @@ public class PrimaryReplicaSyncer {

private static final Logger logger = LogManager.getLogger(PrimaryReplicaSyncer.class);

private final TaskManager taskManager;
private final TransportService transportService;
private final SyncAction syncAction;

public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB);
Expand All @@ -54,12 +55,12 @@ public class PrimaryReplicaSyncer {

@Inject
public PrimaryReplicaSyncer(TransportService transportService, TransportResyncReplicationAction syncAction) {
this(transportService.getTaskManager(), syncAction);
this(transportService, (SyncAction) syncAction);
}

// for tests
public PrimaryReplicaSyncer(TaskManager taskManager, SyncAction syncAction) {
this.taskManager = taskManager;
public PrimaryReplicaSyncer(TransportService transportService, SyncAction syncAction) {
this.transportService = transportService;
this.syncAction = syncAction;
}

Expand Down Expand Up @@ -145,6 +146,7 @@ public void onFailure(final Exception e) {
private void resync(final ShardId shardId, final String primaryAllocationId, final long primaryTerm, final Translog.Snapshot snapshot,
long startingSeqNo, long maxSeqNo, long maxSeenAutoIdTimestamp, ActionListener<ResyncTask> listener) {
ResyncRequest request = new ResyncRequest(shardId, primaryAllocationId);
final TaskManager taskManager = transportService.getTaskManager();
ResyncTask resyncTask = (ResyncTask) taskManager.register("transport", "resync", request); // it's not transport :-)
ActionListener<Void> wrappedListener = new ActionListener<Void>() {
@Override
Expand All @@ -163,7 +165,7 @@ public void onFailure(Exception e) {
};
try {
new SnapshotSender(syncAction, resyncTask, shardId, primaryAllocationId, primaryTerm, snapshot, chunkSize.bytesAsInt(),
startingSeqNo, maxSeqNo, maxSeenAutoIdTimestamp, wrappedListener).run();
startingSeqNo, maxSeqNo, maxSeenAutoIdTimestamp, transportService.getThreadPool().generic(), wrappedListener).run();
} catch (Exception e) {
wrappedListener.onFailure(e);
}
Expand All @@ -182,6 +184,7 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener<R
private final long primaryTerm;
private final ShardId shardId;
private final Translog.Snapshot snapshot;
private final Executor executor; // executor to fork to for reading and then sending ops from the snapshot
private final long startingSeqNo;
private final long maxSeqNo;
private final long maxSeenAutoIdTimestamp;
Expand All @@ -194,7 +197,7 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener<R

SnapshotSender(SyncAction syncAction, ResyncTask task, ShardId shardId, String primaryAllocationId, long primaryTerm,
Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, long maxSeqNo,
long maxSeenAutoIdTimestamp, ActionListener<Void> listener) {
long maxSeenAutoIdTimestamp, Executor executor, ActionListener<Void> listener) {
this.logger = PrimaryReplicaSyncer.logger;
this.syncAction = syncAction;
this.task = task;
Expand All @@ -206,13 +209,14 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener<R
this.startingSeqNo = startingSeqNo;
this.maxSeqNo = maxSeqNo;
this.maxSeenAutoIdTimestamp = maxSeenAutoIdTimestamp;
this.executor = executor;
this.listener = listener;
task.setTotalOperations(snapshot.totalOperations());
}

@Override
public void onResponse(ResyncReplicationResponse response) {
run();
executor.execute(this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.transport.MockTransport;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -42,6 +43,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.Collections.emptySet;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand All @@ -51,7 +53,8 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {

public void testSyncerSendsOffCorrectDocuments() throws Exception {
IndexShard shard = newStartedShard(true);
TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
TransportService transportService = new MockTransport().createTransportService(Settings.EMPTY, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, address -> null, null, emptySet());
AtomicBoolean syncActionCalled = new AtomicBoolean();
List<ResyncReplicationRequest> resyncRequests = new ArrayList<>();
PrimaryReplicaSyncer.SyncAction syncAction =
Expand All @@ -62,7 +65,7 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception {
assertThat(parentTask, instanceOf(PrimaryReplicaSyncer.ResyncTask.class));
listener.onResponse(new ResyncReplicationResponse());
};
PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(taskManager, syncAction);
PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(transportService, syncAction);
syncer.setChunkSize(new ByteSizeValue(randomIntBetween(1, 10)));

int numDocs = randomInt(10);
Expand Down Expand Up @@ -131,8 +134,9 @@ public void testSyncerOnClosingShard() throws Exception {
syncActionCalled.set(true);
threadPool.generic().execute(() -> listener.onResponse(new ResyncReplicationResponse()));
};
PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(
new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()), syncAction);
TransportService transportService = new MockTransport().createTransportService(Settings.EMPTY, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> null, null, emptySet());
PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(transportService, syncAction);
syncer.setChunkSize(new ByteSizeValue(1)); // every document is sent off separately

int numDocs = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.transport.MockTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -159,7 +160,8 @@ protected class ReplicationGroup implements AutoCloseable, Iterable<IndexShard>
private volatile ReplicationTargets replicationTargets;

private final PrimaryReplicaSyncer primaryReplicaSyncer = new PrimaryReplicaSyncer(
new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()),
new MockTransport().createTransportService(Settings.EMPTY, threadPool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to shut this (at least the threadpool) down cleanly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just using the fake transport so I don;t think the service needs any shutting down here. The threadPool is torn down via the parent IndexShardTestCase -> I think we're good?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right we already had a threadpool we're just using the transport service as a wrapper. Nvm.

TransportService.NOOP_TRANSPORT_INTERCEPTOR, address -> null, null, Collections.emptySet()),
(request, parentTask, primaryAllocationId, primaryTerm, listener) -> {
try {
new ResyncAction(request, listener, ReplicationGroup.this).execute();
Expand Down