From 3e33c5f1096ba94c41568624f6e7ee80a63df983 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 4 Mar 2021 08:06:41 +0100 Subject: [PATCH] Make PrimaryReplicaResyncer Fork to Generic Pool Reading ops from the translog snapshot must not run on the transport thread. When sending more than one batch of ops the listener (and thus `run`) would be invoked on the transport thread for all but the first batch of ops. => Forking to the generic pool like we do for sending ops during recovery. --- .../index/engine/LuceneChangesSnapshot.java | 2 ++ .../index/shard/PrimaryReplicaSyncer.java | 18 +++++++++++------- .../index/shard/PrimaryReplicaSyncerTests.java | 14 +++++++++----- .../ESIndexLevelReplicationTestCase.java | 6 ++++-- 4 files changed, 26 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java index 5c2012f41569a..c3a0cf38d1d22 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java @@ -31,6 +31,7 @@ import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.transport.Transports; import java.io.Closeable; import java.io.IOException; @@ -145,6 +146,7 @@ public Translog.Operation next() throws IOException { private boolean assertAccessingThread() { assert singleConsumer == false || creationThread == Thread.currentThread() : "created by [" + creationThread + "] != current thread [" + Thread.currentThread() + "]"; + assert Transports.assertNotTransportThread("reading changes snapshot may involve slow IO"); return true; } diff --git a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index 1b5fc05c4b1b6..ad34a1c4f8a7d 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -36,6 +36,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -45,7 +46,7 @@ public class PrimaryReplicaSyncer { private static final Logger logger = LogManager.getLogger(PrimaryReplicaSyncer.class); - private final TaskManager taskManager; + private final TransportService transportService; private final SyncAction syncAction; public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB); @@ -54,12 +55,12 @@ public class PrimaryReplicaSyncer { @Inject public PrimaryReplicaSyncer(TransportService transportService, TransportResyncReplicationAction syncAction) { - this(transportService.getTaskManager(), syncAction); + this(transportService, (SyncAction) syncAction); } // for tests - public PrimaryReplicaSyncer(TaskManager taskManager, SyncAction syncAction) { - this.taskManager = taskManager; + public PrimaryReplicaSyncer(TransportService transportService, SyncAction syncAction) { + this.transportService = transportService; this.syncAction = syncAction; } @@ -145,6 +146,7 @@ public void onFailure(final Exception e) { private void resync(final ShardId shardId, final String primaryAllocationId, final long primaryTerm, final Translog.Snapshot snapshot, long startingSeqNo, long maxSeqNo, long maxSeenAutoIdTimestamp, ActionListener listener) { ResyncRequest request = new ResyncRequest(shardId, primaryAllocationId); + final TaskManager taskManager = transportService.getTaskManager(); ResyncTask resyncTask = (ResyncTask) taskManager.register("transport", "resync", request); // it's not transport :-) ActionListener wrappedListener = new ActionListener() { @Override @@ -163,7 +165,7 @@ public void onFailure(Exception e) { }; try { new SnapshotSender(syncAction, resyncTask, shardId, primaryAllocationId, primaryTerm, snapshot, chunkSize.bytesAsInt(), - startingSeqNo, maxSeqNo, maxSeenAutoIdTimestamp, wrappedListener).run(); + startingSeqNo, maxSeqNo, maxSeenAutoIdTimestamp, transportService.getThreadPool().generic(), wrappedListener).run(); } catch (Exception e) { wrappedListener.onFailure(e); } @@ -182,6 +184,7 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener listener) { + long maxSeenAutoIdTimestamp, Executor executor, ActionListener listener) { this.logger = PrimaryReplicaSyncer.logger; this.syncAction = syncAction; this.task = task; @@ -206,13 +209,14 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener null, null, emptySet()); AtomicBoolean syncActionCalled = new AtomicBoolean(); List resyncRequests = new ArrayList<>(); PrimaryReplicaSyncer.SyncAction syncAction = @@ -62,7 +65,7 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { assertThat(parentTask, instanceOf(PrimaryReplicaSyncer.ResyncTask.class)); listener.onResponse(new ResyncReplicationResponse()); }; - PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(taskManager, syncAction); + PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(transportService, syncAction); syncer.setChunkSize(new ByteSizeValue(randomIntBetween(1, 10))); int numDocs = randomInt(10); @@ -131,8 +134,9 @@ public void testSyncerOnClosingShard() throws Exception { syncActionCalled.set(true); threadPool.generic().execute(() -> listener.onResponse(new ResyncReplicationResponse())); }; - PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer( - new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()), syncAction); + TransportService transportService = new MockTransport().createTransportService(Settings.EMPTY, threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> null, null, emptySet()); + PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(transportService, syncAction); syncer.setChunkSize(new ByteSizeValue(1)); // every document is sent off separately int numDocs = 10; diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index c219bbf3c6466..45df79937200f 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -73,9 +73,10 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryTarget; -import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.test.transport.MockTransport; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.ArrayList; @@ -159,7 +160,8 @@ protected class ReplicationGroup implements AutoCloseable, Iterable private volatile ReplicationTargets replicationTargets; private final PrimaryReplicaSyncer primaryReplicaSyncer = new PrimaryReplicaSyncer( - new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()), + new MockTransport().createTransportService(Settings.EMPTY, threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, address -> null, null, Collections.emptySet()), (request, parentTask, primaryAllocationId, primaryTerm, listener) -> { try { new ResyncAction(request, listener, ReplicationGroup.this).execute();