Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed prover #1

Open
wants to merge 1 commit into
base: taiko
Choose a base branch
from
Open

Conversation

Champii
Copy link

@Champii Champii commented Jul 30, 2024

Introduces the Distributed Prover

How to run

The Orchestrator

Create a file distributed.json containing a JSON array of the workers' IP:PORT like this

[
    "1.2.3.4:8081",
    "5.6.7.8:8081"
]

NOTE: The orchestrator can also be a worker in its own pool

Then run sp1 with these ENV variables

# This is used to minimise the RAM usage. Increase this to reduce the proving time
# Note: you don't need to set it on the workers, it is propagated along the request
export SHARD_BATCH_SIZE=1
export SP1_PROVER=distributed

The Workers

Run the worker TCP server

let listen_addr = "0.0.0.0:8081";
let orchestrator_addr="10.200.0.15"
sdk::serve_worker(listen_addr, orchestrator_addr).await?;

The orchestrator_addr points to the orchestrator address, it is a filter to only accept incoming connection from this host.

@Champii Champii marked this pull request as ready for review July 30, 2024 14:48
core/src/utils/prove_distributed/checkpoints.rs Outdated Show resolved Hide resolved
core/src/utils/prove_distributed/checkpoints.rs Outdated Show resolved Hide resolved
core/src/utils/prove_distributed/mod.rs Outdated Show resolved Hide resolved
core/src/utils/prove_distributed/mod.rs Outdated Show resolved Hide resolved
sdk/src/distributed/worker/pool.rs Show resolved Hide resolved
sdk/src/distributed/worker/pool.rs Show resolved Hide resolved
@Champii Champii force-pushed the sp1_distributed branch 2 times, most recently from ebc9793 to 64e3081 Compare July 31, 2024 14:38
Copy link

@mratsim mratsim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Continuing review later today.

p3-challenger = { git = "https://github.com/Champii/Plonky3.git", branch = "serde_patch" }
p3-poseidon2 = { git = "https://github.com/Champii/Plonky3.git", branch = "serde_patch" }
p3-baby-bear = { git = "https://github.com/Champii/Plonky3.git", branch = "serde_patch" }
p3-symmetric = { git = "https://github.com/Champii/Plonky3.git", branch = "serde_patch" }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same remark as taikoxyz/raiko#302 (comment)

this should be added to the taikoxyz org, or maybe a taikoxyz-patches special org to avoid pollution if across Raiko and Gwyneth we expect lots of long-term patches. cc @Brechtpd

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a related question is what are the updates in plonky3??

runtime.subproof_verifier = Arc::new(NoOpSubproofVerifier);

let (events, _) =
tracing::debug_span!("runtime.trace").in_scope(|| runtime.execute_record().unwrap());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unfamiliar with Rust idioms re logging, will this report the file/line it is in? (core/src/utils/prove_distributed/checkpoint.rs:23)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it won't, it is really just a classic log::debug!(...) macro but with scoped names.

opts: SP1CoreOpts,
records_tx: SyncSender<Vec<ExecutionRecord>>,
deferred: &mut ExecutionRecord,
) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably needs a comment or a link to a markdown document that explains the shared mutable state.

From the code, you have shards that are logical and mapped to execution_shards.
It seems like the distribution happens ahead of time but then after records are processed this execution shard value is incremented.

Given the potential complexity, I would cleanly have only execution functions and only orchestrating functions. And execution functions should only return a status code. I fear mixing processing and state updates will lead to maintenance burden down the line.

This can be refactored for a follow-up PR.

Copy link
Author

@Champii Champii Aug 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree about needing some more documentation.

This code is an adaptation of the classical sp1's proving process taken from https://github.com/taikoxyz/sp1/blob/main/core/src/utils/prove.rs#L234

While I changed the code organization a bit to better fit our needs, I think we shouldn't tamper too much with the substance of the original code or face a possible complete re implementation of the solution whenever they make some breaking changes upstream.

This whole deferred mechanism is a new system that avoids putting too many heavy public values on each shard. It does improve the performances a bit, but implies a new layer of synchronization and value-sharing among the workers and the orchestrator. If we decide to go down the road of implementing a custom solution we could get rid of that mechanism and simplify the execution path drastically. But this has a greater maintenance cost.

// Update the public values & prover state for the shards which contain "cpu events".
for record in records.iter_mut() {
state.shard += 1;
state.execution_shard = record.public_values.execution_shard;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See here: seems like the caller already orchestrated work distribution.


// Update the public values & prover state for the shards which do not contain "cpu events"
// before committing to them.
state.execution_shard += 1;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but then the worker manages some execution variable itself?

state.last_finalize_addr_bits = record.public_values.last_finalize_addr_bits;
state.start_pc = state.next_pc;
record.public_values = state;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the state a local worker state? If yes it probably should be renamed for clarity.

);

let nb_checkpoints_per_workers =
(checkpoints_states.len() as f64 / nb_workers as f64).ceil() as usize;
Copy link

@mratsim mratsim Aug 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can lead to work imbalance

For example dividing 40 items on 12 workers will lead to
a base_chunk_size of 40/12 = 3 so work on the first 11 workers
will be 3 * 11 = 33, and the remainder 7 on the last worker.

Instead of dividing 40 work items on 12 cores into:
3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 7 = 3*11 + 7 = 40
the best scheme will divide into
4, 4, 4, 4, 3, 3, 3, 3, 3, 3, 3, 3 = 4*4 + 3*8 = 40

See https://github.com/mratsim/constantine/blob/master/constantine/threadpool/partitioners.nim#L26-L77

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

@Champii Champii Aug 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch ! I will study those formulas, but is it worth it to implement such a mechanism ?

Here i put a ceil() call that maximize the number of shards on each workers. While it is clearly not ideal, only the last(s) will have a reduced load, which is not a problem since equal load means equal processing time then all the workers will roughly finish at about the same time, and so it won't add any significant time to the proving process.

And as a bonus, if a worker is left work-less, it only reduces the network traffic needed to communicate with it as well as giving us a free quick recovery worker if any other would fail.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ceil leads to different imbalance issue from Mamy mentioned, but yes, it does not impact the processing time as the busiest work node determines the whole time cost.
I just thought another case, let say 15 tasks to 7machine, the distribution between perfect balance & ceiling workload is: (2 2 2 2 2 2 3) vs (3 3 3 3 3 0 0), seems it match a autoscaling model. as the 1st & 2nd have the same processing time, but obviously the 2nd one has less cost (as it uses only 5 machine).

Copy link

@mratsim mratsim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM, some nits about potential load imbalance and future refactoring.

One thing I'm not too sure about are metrics. It's missing but given that we want to stay close to upstream to ease maintenance burden, it might be OK.

);

let nb_checkpoints_per_workers =
(checkpoints_states.len() as f64 / nb_workers as f64).ceil() as usize;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


std::thread::scope(move |s| {
let (records_tx, shard_proofs_handle) =
threads::spawn_prove(prover, s, opts, scope_span.clone(), challenger.clone(), pk);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that spawn multiple threads for the local worker or a single one?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It only spawns one. I agree that the module name could be renamed to something more explicit.

(i * nb_checkpoints_per_worker * opts.shard_batch_size) as u32;

WorkerRequest::Commit(RequestData {
elf: elf.to_vec(),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need to own the buffer here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a matter of simplicity, we may not want to deal with lifetimes in network de/serialized data structures. Considering the kind of RAM usage we are dealing with later in the process, I deemed this small temporary overhead acceptable. What do you think ?

}

async fn spawn_workers() -> Result<BTreeMap<usize, Arc<RwLock<WorkerSocket>>>, WorkerError> {
let ip_list_string = std::fs::read_to_string("distributed.json")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a config file with a default to distributed.json?

We probably want to offer people to have:
distributed-mainnet.json
distributed-devnet.json
distributed-myboosterrollup1.json
to manage multiple infra from the same repo

This can be a future refactor.

Copy link
Author

@Champii Champii Aug 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is indeed a good idea. We should also offer to change the path of such files via some config


async fn spawn_workers() -> Result<BTreeMap<usize, Arc<RwLock<WorkerSocket>>>, WorkerError> {
let ip_list_string = std::fs::read_to_string("distributed.json")
.expect("Sp1 Distributed: Need a `distributed.json` file with a list of IP:PORT");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this handle IPv6 btw?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't try, but I don't see anything that would prevent it. Will double check tho

Copy link
Collaborator

@smtmfft smtmfft left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just have a quick look, will refer to sp1's impl for further understanding.

p3-challenger = { git = "https://github.com/Champii/Plonky3.git", branch = "serde_patch" }
p3-poseidon2 = { git = "https://github.com/Champii/Plonky3.git", branch = "serde_patch" }
p3-baby-bear = { git = "https://github.com/Champii/Plonky3.git", branch = "serde_patch" }
p3-symmetric = { git = "https://github.com/Champii/Plonky3.git", branch = "serde_patch" }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a related question is what are the updates in plonky3??

records_tx: SyncSender<Vec<ExecutionRecord>>,
deferred: &mut ExecutionRecord,
) {
tracing::debug_span!("phase 1 record generator").in_scope(|| {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw phase 2 is gone. Do you know the reason why they have 2 phases??

}

// See if any deferred shards are ready to be commited to.
let mut _deferred = deferred.split(false, opts.split_opts);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if all deferred are processed after last checkpoint, do we still need split call here?

);

let nb_checkpoints_per_workers =
(checkpoints_states.len() as f64 / nb_workers as f64).ceil() as usize;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ceil leads to different imbalance issue from Mamy mentioned, but yes, it does not impact the processing time as the busiest work node determines the whole time cost.
I just thought another case, let say 15 tasks to 7machine, the distribution between perfect balance & ceiling workload is: (2 2 2 2 2 2 3) vs (3 3 3 3 3 0 0), seems it match a autoscaling model. as the 1st & 2nd have the same processing time, but obviously the 2nd one has less cost (as it uses only 5 machine).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants