-
Notifications
You must be signed in to change notification settings - Fork 198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove redundant fields in ExecutorManager #728
Conversation
Hi @thinkharderdev, could you help review this PR? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think most of the cleanup makes sense but I don't think we should remove the caching in ExecutorManager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this removes the local in-memory cache of heartbeats which is there to avoid having to fetch them from shared state. This is indeed redundant in the case where you are using fully in-memory state but not if you are using shared state across multiple executors.
For example, we use redis for managing the cluster state and use pub/sub channels to propagate heartbeats to all executors so we don't have to constantly fetch them from redis
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. How about reusing InMemoryClusterState
for the KeyValueState
? There are two main reasons for removing the cache part in ExecutorManager
:
- It seems redundant for
InMemoryClusterState
- It's easily to miss some part for ensuring the correctness of the cached things.
Previously I thought it may be unnecessary to cache the cluster state when using state storage. If it's necessary, I thing it's better to introduce the InMemoryClusterState
for the KeyValueState
directly. Then for the cache part, we can focus on InMemoryClusterState
. To achieve this, I will raise a few followup commits to this PR.
Hi @thinkharderdev, now I copy the cache logic into the |
@@ -57,6 +56,8 @@ pub struct KeyValueState< | |||
> { | |||
/// Underlying `KeyValueStore` | |||
store: S, | |||
/// ExecutorHeartbeat cache, executor_id -> ExecutorHeartbeat | |||
executor_heartbeats: Arc<DashMap<String, ExecutorHeartbeat>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to also cache ExecutorMetadata
and ExecutorData
here as well to preserve the functionality previously in ExecutorManager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For ExecutorMetadata
, it can be cached. However, for ExecutorData
, I don't think it should be cached, as it's frequently changed and it's hard to make it consistent with the one stored in backend storage when there are multiple active schedulers.
Actually, I don't think it's necessary to introduce multiple active schedulers in the Ballista cluster. Maybe only HA is needed. If so, we can cache the ExecutorData
with executor available slots to avoid frequent visit to the backend storage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, yeah I don't think ExecutorData
is used at all anymore actually. We use ExecutorTaskSlots
to store the task slots for storing available slots for reservation purposes. So I think we should be fine just caching ExecutorMetadata
Actually, I don't think it's necessary to introduce multiple active schedulers in the Ballista cluster.
We run multiple active schedulers. The scheduler is doing a non-trivial amount of work in scheduling and it's important to us to be able to scale that layer horizontally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the horizontally scaling, actually, I haven't met any bottlenecks of the scheduler after introducing the optimizations previously. maybe single scheduler is enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have :). The problem is mostly around the cost of plan serialization. We frequently see execution plans with 50k+ files and the plan is very large and causes a lot of CPU and memory overhead to serialize.
Aside from that, the other issue is that doing zero-downtime deployments is much easier with multiple active schedulers. It is a solvable problem using leader election but for our use case it was preferable to just run multiple active schedulers and solve both the deployment and scalability issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for the plan, it's also been cached and we only need to do the serialization only once for one query. So do you still have the problem?
The reason I do not prefer the multiple active schedulers is the contention for the execution slots, which will lead to inefficient slots updates and the infeasibility of future consistent hashing based task assignments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the multiple active schedulers is required, I think your current design of ExecutionReservation
is reasonable. Since the consistent hashing based task assignments may be only possible for the single active scheduler without slots contention, we have to use different policy for different scheduler deployments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nie work. Thanks @yahoNanJing!
Which issue does this PR close?
Closes #723.
Rationale for this change
After the cluster state refactored by #658, the state cache part in
ExecutorManager
becomes redundant. It's better to remove them.What changes are included in this PR?
ExecutorManager
are removed:slots_policy
executor_metadata
executors_heartbeat
executor_data
And the
RoundRobinLocal
slot policy is removed.And the
SlotsPolicy
is renamed toTaskDistribution
.Removed buggy executor check for pull-staged task scheduling
Are there any user-facing changes?