Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed prover #1

Open
wants to merge 1 commit into
base: taiko
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
634 changes: 335 additions & 299 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ p3-uni-stark = "0.1.3-succinct"
p3-maybe-rayon = "0.1.3-succinct"
p3-bn254-fr = "0.1.3-succinct"


# For local development.

# p3-air = { path = "../Plonky3/air" }
Expand All @@ -101,3 +102,12 @@ p3-bn254-fr = "0.1.3-succinct"
# p3-uni-stark = { path = "../Plonky3/uni-stark" }
# p3-maybe-rayon = { path = "../Plonky3/maybe-rayon" }
# p3-bn254-fr = { path = "../Plonky3/bn254-fr" }

# Patch Plonky3 for Serialize and Deserialize of DuplexChallenger
[patch.crates-io]
p3-field = { git = "https://github.com/Champii/Plonky3.git", branch = "serde_patch" }
p3-challenger = { git = "https://github.com/Champii/Plonky3.git", branch = "serde_patch" }
p3-poseidon2 = { git = "https://github.com/Champii/Plonky3.git", branch = "serde_patch" }
p3-baby-bear = { git = "https://github.com/Champii/Plonky3.git", branch = "serde_patch" }
p3-symmetric = { git = "https://github.com/Champii/Plonky3.git", branch = "serde_patch" }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same remark as taikoxyz/raiko#302 (comment)

this should be added to the taikoxyz org, or maybe a taikoxyz-patches special org to avoid pollution if across Raiko and Gwyneth we expect lots of long-term patches. cc @Brechtpd

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a related question is what are the updates in plonky3??


1 change: 1 addition & 0 deletions core/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod options;
#[cfg(any(test, feature = "programs"))]
mod programs;
mod prove;
pub mod prove_distributed;
mod serde;
mod tracer;

Expand Down
154 changes: 154 additions & 0 deletions core/src/utils/prove_distributed/checkpoints.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
use std::sync::{mpsc::SyncSender, Arc};

pub use crate::{air::PublicValues, runtime::Program, stark::RiscvAir};

use crate::{
runtime::{ExecutionRecord, NoOpSubproofVerifier, Runtime},
stark::{MachineProver, MachineRecord},
utils::{baby_bear_poseidon2::Val, BabyBearPoseidon2, SP1CoreOpts},
};

use super::Checkpoint;

fn trace_checkpoint(
program: Program,
checkpoint: Checkpoint,
opts: SP1CoreOpts,
) -> (Vec<ExecutionRecord>, Checkpoint) {
let mut runtime = Runtime::recover(program, checkpoint, opts);

runtime.subproof_verifier = Arc::new(NoOpSubproofVerifier);

let (events, _) =
tracing::debug_span!("runtime.trace").in_scope(|| runtime.execute_record().unwrap());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unfamiliar with Rust idioms re logging, will this report the file/line it is in? (core/src/utils/prove_distributed/checkpoint.rs:23)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it won't, it is really just a classic log::debug!(...) macro but with scoped names.


let state = runtime.state.clone();

(events, state)
}

pub fn process<P: MachineProver<BabyBearPoseidon2, RiscvAir<Val>>>(
prover: &P,
program: &Program,
checkpoint: Checkpoint,
nb_checkpoints: usize,
state: PublicValues<u32, u32>,
opts: SP1CoreOpts,
records_tx: SyncSender<Vec<ExecutionRecord>>,
deferred: &mut ExecutionRecord,
is_deferred: bool,
) {
if is_deferred {
process_deferred(program, checkpoint, state, opts, records_tx, deferred);
} else {
process_regular(
prover,
program,
checkpoint,
nb_checkpoints,
state,
opts,
records_tx,
deferred,
);
}
}

fn process_regular<P: MachineProver<BabyBearPoseidon2, RiscvAir<Val>>>(
prover: &P,
program: &Program,
mut checkpoint: Checkpoint,
nb_checkpoints: usize,
mut state: PublicValues<u32, u32>,
opts: SP1CoreOpts,
records_tx: SyncSender<Vec<ExecutionRecord>>,
deferred: &mut ExecutionRecord,
) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably needs a comment or a link to a markdown document that explains the shared mutable state.

From the code, you have shards that are logical and mapped to execution_shards.
It seems like the distribution happens ahead of time but then after records are processed this execution shard value is incremented.

Given the potential complexity, I would cleanly have only execution functions and only orchestrating functions. And execution functions should only return a status code. I fear mixing processing and state updates will lead to maintenance burden down the line.

This can be refactored for a follow-up PR.

Copy link
Author

@Champii Champii Aug 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree about needing some more documentation.

This code is an adaptation of the classical sp1's proving process taken from https://github.com/taikoxyz/sp1/blob/main/core/src/utils/prove.rs#L234

While I changed the code organization a bit to better fit our needs, I think we shouldn't tamper too much with the substance of the original code or face a possible complete re implementation of the solution whenever they make some breaking changes upstream.

This whole deferred mechanism is a new system that avoids putting too many heavy public values on each shard. It does improve the performances a bit, but implies a new layer of synchronization and value-sharing among the workers and the orchestrator. If we decide to go down the road of implementing a custom solution we could get rid of that mechanism and simplify the execution path drastically. But this has a greater maintenance cost.

tracing::debug_span!("phase 1 record generator").in_scope(|| {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw phase 2 is gone. Do you know the reason why they have 2 phases??

let mut processed_checkpoints = 0;

while processed_checkpoints < nb_checkpoints {
log::info!(
"Processing checkpoint {}/{}",
processed_checkpoints + 1,
nb_checkpoints
);
// Trace the checkpoint and reconstruct the execution records.
let (mut records, new_checkpoint) = tracing::debug_span!("trace checkpoint")
.in_scope(|| trace_checkpoint(program.clone(), checkpoint, opts));

checkpoint = new_checkpoint;

// Update the public values & prover state for the shards which contain "cpu events".
for record in records.iter_mut() {
state.shard += 1;
state.execution_shard = record.public_values.execution_shard;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See here: seems like the caller already orchestrated work distribution.

state.start_pc = record.public_values.start_pc;
state.next_pc = record.public_values.next_pc;
record.public_values = state;
}

// Generate the dependencies.
tracing::debug_span!("generate dependencies")
.in_scope(|| prover.machine().generate_dependencies(&mut records, &opts));

// Defer events that are too expensive to include in every shard.
for record in records.iter_mut() {
deferred.append(&mut record.defer());
}

// See if any deferred shards are ready to be commited to.
let mut _deferred = deferred.split(false, opts.split_opts);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if all deferred are processed after last checkpoint, do we still need split call here?


// Update the public values & prover state for the shards which do not contain "cpu events"
// before committing to them.
state.execution_shard += 1;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but then the worker manages some execution variable itself?


records_tx.send(records).unwrap();

processed_checkpoints += 1;
}
});
}

fn process_deferred(
program: &Program,
checkpoint: Checkpoint,
mut state: PublicValues<u32, u32>,
opts: SP1CoreOpts,
records_tx: SyncSender<Vec<ExecutionRecord>>,
deferred: &mut ExecutionRecord,
) {
tracing::debug_span!("phase 1 record generator").in_scope(|| {
// Trace the checkpoint and reconstruct the execution records.
let (mut records, _) = tracing::debug_span!("trace checkpoint")
.in_scope(|| trace_checkpoint(program.clone(), checkpoint, opts));

// Update the public values & prover state for the shards which contain "cpu events".
for record in records.iter_mut() {
// state.shard += 1;
Champii marked this conversation as resolved.
Show resolved Hide resolved
state.execution_shard = record.public_values.execution_shard;
state.start_pc = record.public_values.start_pc;
state.next_pc = record.public_values.next_pc;
record.public_values = state;
}

// See if any deferred shards are ready to be commited to.
let mut deferred = deferred.split(true, opts.split_opts);

// Update the public values & prover state for the shards which do not contain "cpu events"
// before committing to them.

for record in deferred.iter_mut() {
state.shard += 1;
state.previous_init_addr_bits = record.public_values.previous_init_addr_bits;
state.last_init_addr_bits = record.public_values.last_init_addr_bits;
state.previous_finalize_addr_bits = record.public_values.previous_finalize_addr_bits;
state.last_finalize_addr_bits = record.public_values.last_finalize_addr_bits;
state.start_pc = state.next_pc;
record.public_values = state;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the state a local worker state? If yes it probably should be renamed for clarity.


records_tx.send(deferred).unwrap();
});
}
Loading