From bc8987bc71274913c74d427271f45c240b13c490 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 4 Mar 2021 11:09:06 +0100 Subject: [PATCH] Make PrimaryReplicaResyncer Fork to Generic Pool (#69949) (#69952) Reading ops from the translog snapshot must not run on the transport thread. When sending more than one batch of ops the listener (and thus `run`) would be invoked on the transport thread for all but the first batch of ops. => Forking to the generic pool like we do for sending ops during recovery. --- .../index/engine/LuceneChangesSnapshot.java | 2 ++ .../index/shard/PrimaryReplicaSyncer.java | 18 +++++++++++------- .../shard/PrimaryReplicaSyncerTests.java | 19 ++++++++++++------- .../ESIndexLevelReplicationTestCase.java | 6 ++++-- 4 files changed, 29 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java index 05b58afb5b5b4..361b4a8497c3a 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java @@ -35,6 +35,7 @@ import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.transport.Transports; import java.io.Closeable; import java.io.IOException; @@ -151,6 +152,7 @@ public Translog.Operation next() throws IOException { private boolean assertAccessingThread() { assert singleConsumer == false || creationThread == Thread.currentThread() : "created by [" + creationThread + "] != current thread [" + Thread.currentThread() + "]"; + assert Transports.assertNotTransportThread("reading changes snapshot may involve slow IO"); return true; } diff --git a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index 39c12e8d69adc..e7c6ee68174bf 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -46,7 +47,7 @@ public class PrimaryReplicaSyncer { private static final Logger logger = LogManager.getLogger(PrimaryReplicaSyncer.class); - private final TaskManager taskManager; + private final TransportService transportService; private final SyncAction syncAction; public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB); @@ -55,12 +56,12 @@ public class PrimaryReplicaSyncer { @Inject public PrimaryReplicaSyncer(TransportService transportService, TransportResyncReplicationAction syncAction) { - this(transportService.getTaskManager(), syncAction); + this(transportService, (SyncAction) syncAction); } // for tests - public PrimaryReplicaSyncer(TaskManager taskManager, SyncAction syncAction) { - this.taskManager = taskManager; + public PrimaryReplicaSyncer(TransportService transportService, SyncAction syncAction) { + this.transportService = transportService; this.syncAction = syncAction; } @@ -147,6 +148,7 @@ public void onFailure(final Exception e) { private void resync(final ShardId shardId, final String primaryAllocationId, final long primaryTerm, final Translog.Snapshot snapshot, long startingSeqNo, long maxSeqNo, long maxSeenAutoIdTimestamp, ActionListener listener) { ResyncRequest request = new ResyncRequest(shardId, primaryAllocationId); + final TaskManager taskManager = transportService.getTaskManager(); ResyncTask resyncTask = (ResyncTask) taskManager.register("transport", "resync", request); // it's not transport :-) ActionListener wrappedListener = new ActionListener() { @Override @@ -165,7 +167,7 @@ public void onFailure(Exception e) { }; try { new SnapshotSender(syncAction, resyncTask, shardId, primaryAllocationId, primaryTerm, snapshot, chunkSize.bytesAsInt(), - startingSeqNo, maxSeqNo, maxSeenAutoIdTimestamp, wrappedListener).run(); + startingSeqNo, maxSeqNo, maxSeenAutoIdTimestamp, transportService.getThreadPool().generic(), wrappedListener).run(); } catch (Exception e) { wrappedListener.onFailure(e); } @@ -184,6 +186,7 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener listener) { + long maxSeenAutoIdTimestamp, Executor executor, ActionListener listener) { this.logger = PrimaryReplicaSyncer.logger; this.syncAction = syncAction; this.task = task; @@ -208,13 +211,14 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener null, null, emptySet()); AtomicBoolean syncActionCalled = new AtomicBoolean(); List resyncRequests = new ArrayList<>(); PrimaryReplicaSyncer.SyncAction syncAction = @@ -73,7 +76,7 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { assertThat(parentTask, instanceOf(PrimaryReplicaSyncer.ResyncTask.class)); listener.onResponse(new ResyncReplicationResponse()); }; - PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(taskManager, syncAction); + PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(transportService, syncAction); syncer.setChunkSize(new ByteSizeValue(randomIntBetween(1, 10))); int numDocs = randomInt(10); @@ -139,8 +142,9 @@ public void testSyncerOnClosingShard() throws Exception { syncActionCalled.set(true); threadPool.generic().execute(() -> listener.onResponse(new ResyncReplicationResponse())); }; - PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer( - new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()), syncAction); + TransportService transportService = new MockTransport().createTransportService(Settings.EMPTY, threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> null, null, emptySet()); + PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(transportService, syncAction); syncer.setChunkSize(new ByteSizeValue(1)); // every document is sent off separately int numDocs = 10; @@ -201,13 +205,14 @@ public void testDoNotSendOperationsWithoutSequenceNumber() throws Exception { Engine.HistorySource source = shard.indexSettings.isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG; doReturn(TestTranslog.newSnapshotFromOperations(operations)).when(shard).getHistoryOperations(anyString(), eq(source), anyLong()); - TaskManager taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()); + TransportService transportService = new MockTransport().createTransportService(Settings.EMPTY, threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> null, null, emptySet()); List sentOperations = new ArrayList<>(); PrimaryReplicaSyncer.SyncAction syncAction = (request, parentTask, allocationId, primaryTerm, listener) -> { sentOperations.addAll(Arrays.asList(request.getOperations())); listener.onResponse(new ResyncReplicationResponse()); }; - PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(taskManager, syncAction); + PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(transportService, syncAction); syncer.setChunkSize(new ByteSizeValue(randomIntBetween(1, 10))); PlainActionFuture fut = new PlainActionFuture<>(); syncer.resync(shard, fut); diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 760ba4e01a86d..f4c0eb4bbd21f 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -73,9 +73,10 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryTarget; -import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.test.transport.MockTransport; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.ArrayList; @@ -164,7 +165,8 @@ protected class ReplicationGroup implements AutoCloseable, Iterable private volatile ReplicationTargets replicationTargets; private final PrimaryReplicaSyncer primaryReplicaSyncer = new PrimaryReplicaSyncer( - new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()), + new MockTransport().createTransportService(Settings.EMPTY, threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, address -> null, null, Collections.emptySet()), (request, parentTask, primaryAllocationId, primaryTerm, listener) -> { try { new ResyncAction(request, listener, ReplicationGroup.this).execute();