diff --git a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java index 5c2012f41569a..c3a0cf38d1d22 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java @@ -31,6 +31,7 @@ import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.transport.Transports; import java.io.Closeable; import java.io.IOException; @@ -145,6 +146,7 @@ public Translog.Operation next() throws IOException { private boolean assertAccessingThread() { assert singleConsumer == false || creationThread == Thread.currentThread() : "created by [" + creationThread + "] != current thread [" + Thread.currentThread() + "]"; + assert Transports.assertNotTransportThread("reading changes snapshot may involve slow IO"); return true; } diff --git a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index 1b5fc05c4b1b6..ad34a1c4f8a7d 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -36,6 +36,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -45,7 +46,7 @@ public class PrimaryReplicaSyncer { private static final Logger logger = LogManager.getLogger(PrimaryReplicaSyncer.class); - private final TaskManager taskManager; + private final TransportService transportService; private final SyncAction syncAction; public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB); @@ -54,12 +55,12 @@ public class PrimaryReplicaSyncer { @Inject public PrimaryReplicaSyncer(TransportService transportService, TransportResyncReplicationAction syncAction) { - this(transportService.getTaskManager(), syncAction); + this(transportService, (SyncAction) syncAction); } // for tests - public PrimaryReplicaSyncer(TaskManager taskManager, SyncAction syncAction) { - this.taskManager = taskManager; + public PrimaryReplicaSyncer(TransportService transportService, SyncAction syncAction) { + this.transportService = transportService; this.syncAction = syncAction; } @@ -145,6 +146,7 @@ public void onFailure(final Exception e) { private void resync(final ShardId shardId, final String primaryAllocationId, final long primaryTerm, final Translog.Snapshot snapshot, long startingSeqNo, long maxSeqNo, long maxSeenAutoIdTimestamp, ActionListener listener) { ResyncRequest request = new ResyncRequest(shardId, primaryAllocationId); + final TaskManager taskManager = transportService.getTaskManager(); ResyncTask resyncTask = (ResyncTask) taskManager.register("transport", "resync", request); // it's not transport :-) ActionListener wrappedListener = new ActionListener() { @Override @@ -163,7 +165,7 @@ public void onFailure(Exception e) { }; try { new SnapshotSender(syncAction, resyncTask, shardId, primaryAllocationId, primaryTerm, snapshot, chunkSize.bytesAsInt(), - startingSeqNo, maxSeqNo, maxSeenAutoIdTimestamp, wrappedListener).run(); + startingSeqNo, maxSeqNo, maxSeenAutoIdTimestamp, transportService.getThreadPool().generic(), wrappedListener).run(); } catch (Exception e) { wrappedListener.onFailure(e); } @@ -182,6 +184,7 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener listener) { + long maxSeenAutoIdTimestamp, Executor executor, ActionListener listener) { this.logger = PrimaryReplicaSyncer.logger; this.syncAction = syncAction; this.task = task; @@ -206,13 +209,14 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener null, null, emptySet()); AtomicBoolean syncActionCalled = new AtomicBoolean(); List resyncRequests = new ArrayList<>(); PrimaryReplicaSyncer.SyncAction syncAction = @@ -62,7 +65,7 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { assertThat(parentTask, instanceOf(PrimaryReplicaSyncer.ResyncTask.class)); listener.onResponse(new ResyncReplicationResponse()); }; - PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(taskManager, syncAction); + PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(transportService, syncAction); syncer.setChunkSize(new ByteSizeValue(randomIntBetween(1, 10))); int numDocs = randomInt(10); @@ -131,8 +134,9 @@ public void testSyncerOnClosingShard() throws Exception { syncActionCalled.set(true); threadPool.generic().execute(() -> listener.onResponse(new ResyncReplicationResponse())); }; - PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer( - new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()), syncAction); + TransportService transportService = new MockTransport().createTransportService(Settings.EMPTY, threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, boundTransportAddress -> null, null, emptySet()); + PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(transportService, syncAction); syncer.setChunkSize(new ByteSizeValue(1)); // every document is sent off separately int numDocs = 10; diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index c219bbf3c6466..45df79937200f 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -73,9 +73,10 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryTarget; -import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.test.transport.MockTransport; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.ArrayList; @@ -159,7 +160,8 @@ protected class ReplicationGroup implements AutoCloseable, Iterable private volatile ReplicationTargets replicationTargets; private final PrimaryReplicaSyncer primaryReplicaSyncer = new PrimaryReplicaSyncer( - new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet()), + new MockTransport().createTransportService(Settings.EMPTY, threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, address -> null, null, Collections.emptySet()), (request, parentTask, primaryAllocationId, primaryTerm, listener) -> { try { new ResyncAction(request, listener, ReplicationGroup.this).execute();