-
Notifications
You must be signed in to change notification settings - Fork 184
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RFC] Rabit2 Design Proposal #89
Comments
Thanks for putting this up. Can we elaborate a bit what is the difference from the original rabit recovery protocol. Specifically, the original rabit benefit from the assumption that only result of allreduce is needed, and we can just store some of them in certain nodes |
@tqchen Absolutely, I updated design with more detail on how we plan to optimize recovery series of single node failures over entire run time. (t1 node1_crash, t2 node2_crash, .... ) In term of store and restore checkpoint within cluster. I plan to stick with current implementation by offering external checkpoint as option( in case all nodes dead or tracker dead) |
let's hold a bit on this, we will finalize several parts internally and send to public for review |
Problem
MPI became de facto protocol for distributed machine learning synchronization framework. From traditional ML such as xgboost or light bgm to recent deep learning framework such as Pytorch or Tensorflow (horovod), MPI communication primitives fits well with machine learning iterative synchronization diagrams.
Majority of MPI frameworks such as facebook gloo or nvidia nccl or uber horvood focus on efficiency especially on GPU communication bandwidth. The driving force behind mostly were able to sync large neural network model efficiently and avoid congestion.
In light of larger dataset, distributed ml involves more hosts and running longer. The risk of single point of failure increases. MPI by nature requires all up which makes it vulnerable to single point of failure. In response to this challenge, majority of ML frameworks support synchronous external checkpoints where training job will perform upload model to external storage before the next iteration. In case any host failure happens, scheduler will redo data distribution, command workers pull last checkpoint from external storage and resume training.
The upside of such implementation divide synchronization failure recovery from MPI operation implementation. It allows various frameworks implement its own checkpointing logic and integrate with device specific synchronization libraries; The downside of such implementation is recovery usually take much longer as it involves reingest entire dataset and involve all workers bootstrap from last checkpoint. Moreover, checkpoint to external storage is relative expensive operation where users choose to do only after a few iterations, making training time lost grows even larger.
On the other venue, compute resource schedulers (K8S GCP, Peloton) introduced preemptive resource allocation to improve overall compute resource utilization. While preemptive resource allocation did improve overall resource utilization ratio and way cheaper to get from cloud vendors. It post a big stability challenge to large scale distributed ML jobs.
Architecture
Resilient ML sync designed to served as generic and performant infra piece tackling common issues in large scale machine learning jobs.
EventDriven Rabit FSM
we employed event based with epoll/kqueue, oa allreduce worker is going through following FSM while executing Allreduce /BroadCast workload. State transition of a worker is triggered by function calls or network events from epoll sockets on non blocking and threading fashion.
List of function call events from code are
List of network events from between agents are
Minor note, thread safe exactly once delivery is guarded with low level edge trigger. It offers better threading parallelism when dealing with multiple connections heavy weight send/recv events handling.
Failure Recovery
In original rabit paper, it described failure recovery as following steps
Looking into communication between agents across entire topology, this global lock is not always needed. Those five steps recovery can be optimized with following locking improvements.
Let’s model a single task on a single agent like following:
Data flow
Output = reduce( input1, input2, input3… input_n, input self)
where each input can be parallelized (e.g a dedicated thread); Reduce function enjoy out of order execution property against inputs from sources f(a, b, c) = f(f(a,b), f(c)) and hence can prioritize execution against received results. we introduce two blocking primitives wait_for_peer_recovery and resume to handle peer error state (state transition logic skipped below)
The algorithm fits well with epoll_event based edge trigger input_i ready can be observed with EPOLLIN against each socket independently. Most heavy lift data transfer can be done in parallel while reduce function can be executed on partial inputs instead of waiting for the entire input set become available. It also avoid global locking of network topology with implied data dependency on any connected graph.
Lazy Blocking Recovery
Compared with baseline, we identified two cases where peers downing data exchange with crashing peers could prioritize works and block instead of reset.
Three cases of running ALLREDUCE MIN on tree based topology(it also applied to ring based)
Agent running on peer error state and agent already finished communicating with peers. This is the case where peer read finished all bytes. Agent in peer error state can keep running until next task read peer state. At the same time, the peer in error state(the one cause current agent in peer error state) needs to go through the recovery process by pulling the latest version of model as well as result collected from other peers via read peer. Such data lineage can be found with send buffer per link (up to one iteration). By storing model checkpoint along with send buffers per link in each agent across entire cluster(see rabit paper), rabit2 will be able to recover only crashed agents.
Partial Result
As stated in the previous sections, sources are sending data in non blocking and asynchronous fashion. In case agent crashed, recovered agent can sync with peer on how many bytes already sent to other side. Recovered peer would be able to just send starting from already sent offset instead of resend everything. This helps save network bandwidth.
The text was updated successfully, but these errors were encountered: