Skip to content

Commit

Permalink
Add a version number to WorkerEnvelope
Browse files Browse the repository at this point in the history
  • Loading branch information
Champii committed Jul 4, 2024
1 parent 85fff43 commit 2680485
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 13 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ hyper = { version = "0.14.27", features = ["server"] }
reqwest = { version = "0.11.22", features = ["json"] }
url = "2.5.0"
async-trait = "0.1.80"
async-channel = "2.3.1"

# crypto
kzg = { package = "rust-kzg-zkcrypto", git = "https://github.com/brechtpd/rust-kzg.git", branch = "sp1-patch", default-features = false }
Expand All @@ -159,7 +160,6 @@ secp256k1 = { version = "0.29", default-features = false, features = [
"global-context",
"recovery",
] }
async-channel = "2.3.1"

# macro
syn = { version = "1.0", features = ["full"] }
Expand Down
2 changes: 1 addition & 1 deletion host/src/server/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async fn listen_worker(state: ProverState) {

// We purposely don't spawn the task here, as we want to block to limit the number
// of concurrent connections to one.
if let Err(e) = handle_worker_socket(WorkerSocket::new(socket)).await {
if let Err(e) = handle_worker_socket(WorkerSocket::from_streamm_stream(socket)).await {

Check warning on line 63 in host/src/server/worker.rs

View workflow job for this annotation

GitHub Actions / check-for-typos

"streamm" should be "stream".
error!("Error while handling worker socket: {:?}", e);
}
}
Expand Down
2 changes: 2 additions & 0 deletions lib/src/prover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ pub enum WorkerError {
Serde(#[from] bincode::Error),
#[error("Worker invalid magic number")]
InvalidMagicNumber,
#[error("Worker invalid version")]
InvalidVersion,
#[error("Worker invalid request")]
InvalidRequest,
#[error("Worker invalid response")]
Expand Down
21 changes: 19 additions & 2 deletions provers/sp1/driver/src/distributed/worker/envelope.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,34 @@
use raiko_lib::prover::WorkerError;
use serde::{Deserialize, Serialize};

use crate::WorkerProtocol;

#[derive(Debug, Serialize, Deserialize)]
pub struct WorkerEnvelope {
pub magic: u64,
pub data: WorkerProtocol,
magic: u64,
version: u64,
data: WorkerProtocol,
}

impl WorkerEnvelope {
pub fn data(self) -> Result<WorkerProtocol, WorkerError> {
if self.magic != 0xdeadbeef {
return Err(WorkerError::InvalidMagicNumber);
}

if self.version != include!("../../../worker.version") {
return Err(WorkerError::InvalidVersion);
}

Ok(self.data)
}
}

impl From<WorkerProtocol> for WorkerEnvelope {
fn from(data: WorkerProtocol) -> Self {
WorkerEnvelope {
magic: 0xdeadbeef,
version: include!("../../../worker.version"),
data,
}
}
Expand Down
14 changes: 5 additions & 9 deletions provers/sp1/driver/src/distributed/worker/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ use crate::{PartialProofRequest, WorkerEnvelope, WorkerProtocol};
const PAYLOAD_MAX_SIZE: usize = 1 << 26;

pub struct WorkerSocket {
pub socket: tokio::net::TcpStream,
socket: tokio::net::TcpStream,
}

impl WorkerSocket {
pub async fn connect(url: &str) -> Result<Self, WorkerError> {
let stream = tokio::net::TcpStream::connect(url).await?;
let socket = tokio::net::TcpStream::connect(url).await?;

Ok(WorkerSocket { socket: stream })
Ok(WorkerSocket::from_streamm_stream(socket))

Check warning on line 18 in provers/sp1/driver/src/distributed/worker/socket.rs

View workflow job for this annotation

GitHub Actions / check-for-typos

"streamm" should be "stream".
}

pub fn new(socket: tokio::net::TcpStream) -> Self {
pub fn from_streamm_stream(socket: tokio::net::TcpStream) -> Self {

Check warning on line 21 in provers/sp1/driver/src/distributed/worker/socket.rs

View workflow job for this annotation

GitHub Actions / check-for-typos

"streamm" should be "stream".
WorkerSocket { socket }
}

Expand All @@ -42,11 +42,7 @@ impl WorkerSocket {

let envelope: WorkerEnvelope = bincode::deserialize(&data)?;

if envelope.magic != 0xdeadbeef {
return Err(WorkerError::InvalidMagicNumber);
}

Ok(envelope.data)
envelope.data()
}

// TODO: Add a timeout
Expand Down
1 change: 1 addition & 0 deletions provers/sp1/driver/worker.version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1

0 comments on commit 2680485

Please sign in to comment.