-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Extract batch executor out of cluster service #24102
Extract batch executor out of cluster service #24102
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that it's a nice, and long overdue cleanup. I left some comments about the naming.
@@ -244,6 +240,35 @@ protected synchronized void doStop() { | |||
protected synchronized void doClose() { | |||
} | |||
|
|||
class ClusterServiceTaskBatching extends TaskBatching { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry to get hung up on the naming, I know it's annoying, but should it be ClusterServiceTaskBatcher
and TaskBatcher
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: +1 to Jason's suggestion
* Batching support for {@link PrioritizedEsThreadPoolExecutor} | ||
* Tasks that share the same batching key are batched (see {@link BatchingTask#batchingKey}) | ||
*/ | ||
public abstract class TaskBatching { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be TaskBatcher
?
* Represents a runnable task that supports batching. | ||
* Implementors of TaskBatching can subclass this to add a payload to the task. | ||
*/ | ||
protected abstract class BatchingTask extends SourcePrioritizedRunnable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be BatchableTask
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My preference goes to BatchedTask and have the taskIdentity become just task (next to the batching key). Obviously this falls deep into bikeshedding territory so feel totally free to reject and keep what you have.
} | ||
} | ||
|
||
public interface BatchingKey<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think BatchKey
?
@@ -1189,7 +1074,7 @@ public void clusterChanged(ClusterChangedEvent event) { | |||
if (master && !event.localNodeMaster()) { | |||
master = false; | |||
for (LocalNodeMasterListener listener : listeners) { | |||
Executor executor = threadPool.executor(listener.executorName()); | |||
java.util.concurrent.Executor executor = threadPool.executor(listener.executorName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that you can just import this and there's no need for the FQN here?
@@ -1180,7 +1065,7 @@ public void clusterChanged(ClusterChangedEvent event) { | |||
if (!master && event.localNodeMaster()) { | |||
master = true; | |||
for (LocalNodeMasterListener listener : listeners) { | |||
Executor executor = threadPool.executor(listener.executorName()); | |||
java.util.concurrent.Executor executor = threadPool.executor(listener.executorName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that you can just import this and there's no need for the FQN here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great. I left a bunch of nits + two real comments (re thread pool and the BatchingKey interface)
import java.util.List; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
public abstract class SingleTaskExecutor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java docs would be great.
@@ -244,6 +240,35 @@ protected synchronized void doStop() { | |||
protected synchronized void doClose() { | |||
} | |||
|
|||
class ClusterServiceTaskBatching extends TaskBatching { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: +1 to Jason's suggestion
* Represents a runnable task that supports batching. | ||
* Implementors of TaskBatching can subclass this to add a payload to the task. | ||
*/ | ||
protected abstract class BatchingTask extends SourcePrioritizedRunnable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My preference goes to BatchedTask and have the taskIdentity become just task (next to the batching key). Obviously this falls deep into bikeshedding territory so feel totally free to reject and keep what you have.
|
||
@Override | ||
protected void run(Object batchingKey, List<? extends BatchingTask> tasks, String tasksSummary) { | ||
ClusterStateTaskExecutor<Object> taskExecutor = (ClusterStateTaskExecutor<Object>) batchingKey; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm doubting if we should introduce generics for the BatchingTask and batchingKey. I'm fine with keeping as is if you don't like the generic version.
} | ||
} | ||
if (toRemove.isEmpty() == false) { | ||
Object batchingExecutor = toRemove.get(0).batchingKey; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we assert all of them have the same batching key?
} | ||
} | ||
} | ||
for (BatchingTask task : toRemove) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we maybe change onTimeout(task, timeout) to be onTimeout(List<..> tasks, timeout, batchingKey) ? than we can leave the spawning to the implementation. This means we don't need to capture a thread pool for this class. Also I'm wondering if we should fold threadPool.scheduler()
into PrioritizedEsThreadPoolExecutor
as I don't see the need for people to use multiple schedulers.
} | ||
|
||
/** | ||
* Action to be implemented by the specific batching implementation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a comment that all the tasks are guaranteed to have to supplied batching key?
* This method can be called multiple times with different lists before execution. | ||
* This allows groupd task description but the submitting source. | ||
*/ | ||
default String describeTasks(List<T> tasks) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we need this interface. It feels weird to me that the batchingKey knows how to describe the tasks. I would prefer to have an abstract method describeTasks(List<? extends BatchedTask>, batchKey)
which can be overridden by the subclasses. This will be consistent with run and onTimeout (see other comment) and will allows us to remove this interface.
import static org.hamcrest.Matchers.equalTo; | ||
import static org.hamcrest.Matchers.hasToString; | ||
|
||
public class TaskBatchingTests extends TaskExecutorTests { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this is the only sub class that inherits from TaskExecutorTests, can we fold them together? or do you already see a clear need for a base class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The separation into two classes allows the tests in TaskExecutorTests to be run both with and without batching support, i.e. it shows that batching does not have an adverse effect on these tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I'm not sure it merits the base class, but I'm willing to go with it and see how it goes.
@jasontedor @bleskes thanks for the feedback. I've pushed a1e1ec4 which should address all comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making the name change, can you also change the variable names throughout to reflect? (Again, I'm so sorry for such an annoying request, I think it's a really good PR.)
@@ -114,14 +110,14 @@ | |||
private TimeValue slowTaskLoggingThreshold; | |||
|
|||
private volatile PrioritizedEsThreadPoolExecutor threadPoolExecutor; | |||
private volatile ClusterServiceTaskBatcher taskBatching; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you update variable names to reflect the class name changes too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I've pushed 98a4117 which should hopefully fix that (and others)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Thank you @ywelsch. |
Refactoring that extracts the task batching functionality from ClusterService and makes it a reusable component that can be tested in isolation.
Refactoring that extracts the task batching functionality from ClusterService and makes it a reusable component that can be tested in isolation.
…ecutor (#24237) Changes in #24102 exposed the following oddity: PrioritizedEsThreadPoolExecutor.getPending() can return Pending entries where pending.task == null. This can happen for example when tasks are added to the pending list while they are in the clean up phase, i.e. TieBreakingPrioritizedRunnable#runAndClean has run already, but afterExecute has not removed the task yet. Instead of safeguarding consumers of the API (as was done before #24102) this changes the executor to not count these tasks as pending at all.
…ecutor (#24237) Changes in #24102 exposed the following oddity: PrioritizedEsThreadPoolExecutor.getPending() can return Pending entries where pending.task == null. This can happen for example when tasks are added to the pending list while they are in the clean up phase, i.e. TieBreakingPrioritizedRunnable#runAndClean has run already, but afterExecute has not removed the task yet. Instead of safeguarding consumers of the API (as was done before #24102) this changes the executor to not count these tasks as pending at all.
…ecutor (#24237) Changes in #24102 exposed the following oddity: PrioritizedEsThreadPoolExecutor.getPending() can return Pending entries where pending.task == null. This can happen for example when tasks are added to the pending list while they are in the clean up phase, i.e. TieBreakingPrioritizedRunnable#runAndClean has run already, but afterExecute has not removed the task yet. Instead of safeguarding consumers of the API (as was done before #24102) this changes the executor to not count these tasks as pending at all.
* master: (61 commits) Build: Move plugin cli and tests to distribution tool (elastic#24220) Peer Recovery: remove maxUnsafeAutoIdTimestamp hand off (elastic#24243) Adds version 5.3.2 and backwards compatibility indices for 5.3.1 Add utility method to parse named XContent objects with typed prefix (elastic#24240) MultiBucketsAggregation.Bucket should not extend Writeable (elastic#24216) Don't expose cleaned-up tasks as pending in PrioritizedEsThreadPoolExecutor (elastic#24237) Adds declareNamedObjects methods to ConstructingObjectParser (elastic#24219) ESIntegTestCase.indexRandom should not introduce types. (elastic#24202) Tests: Extend InternalStatsTests (elastic#24212) IndicesQueryCache should delegate the scorerSupplier method. (elastic#24209) Speed up parsing of large `terms` queries. (elastic#24210) [TEST] make sure that the random query_string query generator defines a default_field or a list of fields token_count type : add an option to count tokens (fix elastic#23227) (elastic#24175) Query string default field (elastic#24214) Make Aggregations an abstract class rather than an interface (elastic#24184) [TEST] ensure expected sequence no and version are set when index/delete engine operation has a document failure Extract batch executor out of cluster service (elastic#24102) Add 5.3.1 to bwc versions Added "release-state" support to plugin docs Added examples to cross cluster search of using cluster settings ...
…ecutor (elastic#24237) Changes in elastic#24102 exposed the following oddity: PrioritizedEsThreadPoolExecutor.getPending() can return Pending entries where pending.task == null. This can happen for example when tasks are added to the pending list while they are in the clean up phase, i.e. TieBreakingPrioritizedRunnable#runAndClean has run already, but afterExecute has not removed the task yet. Instead of safeguarding consumers of the API (as was done before elastic#24102) this changes the executor to not count these tasks as pending at all.
Code refactoring that extracts the executor functionality from
ClusterService
and makes it a reusable component that can be tested in isolation. This will allow this component to be used in a follow-up separatingClusterService
intoMasterService
andClusterApplierService
, which will both make use of the executor components.