Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor StateBackendClient to be a higher-level interface #554

Closed
thinkharderdev opened this issue Dec 7, 2022 · 1 comment · Fixed by #560 or #658
Closed

Refactor StateBackendClient to be a higher-level interface #554

thinkharderdev opened this issue Dec 7, 2022 · 1 comment · Fixed by #560 or #658
Labels
enhancement New feature or request

Comments

@thinkharderdev
Copy link
Contributor

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
(This section helps Arrow developers understand the context and why for this feature, in addition to the what)

Currently, persistent state is managed using the StateBackendClient trait which is very low-level interface over a key-value store. This is quite flexible but has a couple of drawbacks:

  1. It requires serde overhead even when it is not required. For instance in the recently added MemoryBackendState everything is kept in memory so there is no real reason why anything needs to be serialized to Vec<u8>
  2. Much more importantly, it fixes access patterns in a way that prevents specific implementation from using backend-specific features such as batch operations, atomic counters, key expiration etc. This has the effect of forcing the use of distributed locking in cases where it can potentially be avoided.

For example, it's possible with a Redis as a backend to do global task slot allocation atomically and without any distributed locking using HMap and a little bit of server-side scripting. But there is really no way to express that using the current interface.

Describe the solution you'd like
A clear and concise description of what you want to happen.

Break the current StateBackendClient trait into two separate higher-level interfaces:

trait JobState {
  async fn save_job(&self, job_id: &str, graph: ExecutionGraph) -> Result<()>;
  async fn fail_job(&self, job_id: &str, reason: String, graph: Option<ExecutionGraph>) -> Result<()>; 
  async fn remove_job(&self, job_id: &str) -> Result<()>;
  async fn save_session(&self, ctx: &Arc<SessionContext>) -> Result<()>;
  async fn get_session(&self, session_id: &str) -> Result<Arc<SessionContext>>;
}

trait ClusterState {
  async fn reserve_slots(&self, num_slots: u32) -> Result<Vec<ExecutorReservation>>;
  /// Either reserve all `num_slots` slots or none
  async fn reserve_slots_exact(&self, num_slots: u32) -> Result<Vec<ExecutorReservation>>;
  async fn cancel_reservations(&self, reservations: Vec<ExecutorReservations>) -> Result<()>;
  async fn register_executor(&self, metadata: ExecutorMetadata, spec: ExecutorSpecification, reserve: bool) -> Result<Vec<ExecutorReservation>>;
  async fn remove_executor(&self, executor_id: &str) -> Result<()>;
}

This is only a sketch and these interfaces may need to be tweaked in various ways, but I think the benefits of the general approach are significant:

  1. It pushes locking down as an implementation detail in the state backend so we can leverage advanced features of particular data stores
  2. It makes application code more concise as we don't have to deal with the fussy details of the low-level interface there
  3. Implementations can "pick and choose" what needs to be visible globally (eg to all schedulers in a cluster) so we have more flexibility in deciding where we want to make tradeoffs between resiliency and performance.
  4. For things where we only need to store state in memory we can get rid of the serialization overhead.

Note that the above traits don't have a way to Watch keys. I think the only place we use that is for listening to executor heartbeats and I think that can be internalized into the ClusterState implementation but if not it should be easy to work into this design.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

We could not do this

Additional context
Add any other context or screenshots about the feature request here.

As briefly mentioned above this is mostly coming from the desire to make Ballista truly highly available. The basic outline of the HA solution is pretty straightforward:

  1. Run multiple schedulers which manage all job state in memory.
  2. Handle sharding of jobs across schedulers on the client side using consistent hashing.
  3. Use Redis as a backend for task slots/executor data so we can do atomic, lock-free slot allocation.
  4. Profit!
@thinkharderdev thinkharderdev added the enhancement New feature or request label Dec 7, 2022
@thinkharderdev
Copy link
Contributor Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
1 participant